CountDownLatch CyclicBarrier Semaphore
Java 并发编程是整个 Java 开发体系中最难以理解但也是最重要的知识点,也是各类开源分布式框架(如 ZooKeeper、Kafka、Spring Cloud、Netty 等)中各个并发组件实现的基础。J.U.C 并发包,即 java.util.concurrent
包,大大提高了并发性能,是 JDK 的核心工具包,是 JDK 1.5 之后,由 Doug Lea 实现并引入。而 AQS 被认为是 J.U.C 的核心。
AQS 是一个抽象类,并没有对并发类提供了一个统一的接口定义,而是由子类根据自身的情况实现相应的方法,AQS 中一般包含两个方法 acquire(int)、release(int),获取同步状态和释放同步状态,AQS 根据其状态是否独占分为独占模式和共享模式。
- 独占模式:同一时刻最多只有一个线程获取同步状态,处于该模式下,其他线程试图获取该锁将无法获取成功。
- 共享模式:同一时刻会有多个线程获取共享同步状态,处于该模式下,其他线程试图获取该锁可能会获取成功。
同步器根据同步状态分为独占模式和共享模式,独占模式包括类:ReentrantLock
、ReentrantReadWriteLock.WriteLock
,共享模式包括:Semaphore
、CountDownLatch
、ReentrantReadWriteLock.ReadLock
,着重看一下 java.util.concurrent
包下一些辅助同步器类:CountDownLatch
、CyclicBarrier
、Semaphore
、Exchanger
、Phaser
1、CountDownLatch
- 闭锁
简介
CountDownLatch
是一个同步辅助工具类,通过它可以完成类似于阻塞当前线程的功能,也就是一个或多个线程一直等待直到其他线程执行完成。即允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。CountDownLatch
用了一个给定的计数器 cnt
来进行初始化,该计数器的操作是原子操作,即同时只能有一个线程操作该计数器,调用该类 await
方法的线程会一直处于阻塞状态,直到其他线程调用 countDown
方法时计数器的值变成 0
,每次调用 countDown
时计数器的值会减 1
,当计数器的值为 0
时所有因 await 方法而处于等待状态的线程就会继续执行。计数器 cnt
是闭锁需要等待的线程数量,只能被设置一次,且 CountDownLatch
没有提供任何机制去重新设置计数器 count
,如果需要重置,可以考虑使用 CyclicBarrier
。
CountdownLatch_example
使用场景
(1)开启多个线程分块下载一个大文件,每个线程只下载固定的一截,最后由另外一个线程来拼接所有的分段。
(2)应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
(3)确保一个计算不会执行,直到所需要的资源被初始化。
(4)并行计算,处理量很大时可以将运算任务拆分成多个子任务,当所有子任务都完成之后,父任务再将所有子任务都结果进行汇总。
主要接口分析
CountDownLatch
内部依赖 Sync
实现,而 Sync
继承 AQS。CountDownLatch
关键接口如下:
countDown()
如果当前计数器的值大于1
,则将其减1
;若当前值为1
,则将其置为0
并唤醒所有通过await
等待的线程;若当前值为0
,则什么也不做直接返回。await()
等待计数器的值为0
,若计数器的值为0
则该方法返回;若等待期间该线程被中断,则抛出InterruptedException
并清除该线程的中断状态。await(long timeout, TimeUnit unit)
在指定的时间内等待计数器的值为0
,若在指定时间内计数器的值变为0
,则该方法返回true
;若指定时间内计数器的值仍未变为0
,则返回false
;若指定时间内计数器的值变为0
之前当前线程被中断,则抛出InterruptedException
并清除该线程的中断状态。getCount()
读取当前计数器的值,一般用于调试或者测试。Coding 演示
(1)作为一个开关 / 入口
将初始计数值为 1 的CountDownLatch
作为一个的开关或入口,在调用countDown()
的线程打开入口前,所有调用await
的线程都一直在入口处等待。public class Driver {
private static final Integer WORK_COUNT = 10;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) {
// 初始化计数器为 10 的 CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < WORK_COUNT; i++) {
executorService.execute(new Worker(countDownLatch));
}
// 主线程执行
doSomething();
// 主线程开启开关
countDownLatch.countDown();
// 平滑地关闭 ExecutorService
executorService.shutdown();
}
private static void doSomething() {
// ...
System.out.print("start..");
}
}
class Worker implements Runnable {
private final CountDownLatch countDownLatch;
Worker(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
// 所有执行线程在此处等待开关开启 [多个子线程同时执行]
countDownLatch.await();
// 子线程执行
doWork();
} catch (InterruptedException ignored) {
}
}
private void doWork() {
// ...
System.out.print("run..");
}
}
// Output
// start..run..run..run..run..run..run..run..run..run..run..
(2)作为一个完成信号
将初始计数值为 N 的CountDownLatch
作为一个完成信号点,使某个线程在其它 N 个线程完成某项操作之前一直等待。public class Driver {
private static final Integer WORK_COUNT = 10;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) throws InterruptedException {
// 初始化计数器为 10 的 CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(WORK_COUNT);
for (int i = 0; i < WORK_COUNT; i++) {
executorService.execute(new Worker(countDownLatch));
}
// 主线程等待其它 N 个线程完成
countDownLatch.await();
// 主线程执行
doSomething();
// 平滑地关闭 ExecutorService
executorService.shutdown();
}
private static void doSomething() {
// ...
System.out.println("end");
}
}
class Worker implements Runnable {
private final CountDownLatch countDownLatch;
Worker(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
// 子线程执行
doWork();
// 每个线程做完自己的事情后, 就将计数器减去 1
countDownLatch.countDown();
}
private void doWork() {
// ...
System.out.print("run..");
}
}
// Output
// run..run..run..run..run..run..run..run..run..run..end
2、
CyclicBarrier
- 循环栅栏简介
CyclicBarrier
和CountDownLatch
是非常类似的,CyclicBarrier
核心的概念是在于设置一个等待线程的数量边界,到达了此边界之后进行执行。CyclicBarrier
也是一个同步辅助工具类,它允许一组线程相互等待直到到达某个公共的屏障点(Common Barrier Point),通过它可以完成多个线程之间相互等待时,只有当每个线程都准备就绪后才能各自继续执行后面的操作。CyclicBarrier
也是通过计数器来实现,当某个线程调用await
方法后就进入等待状态,计数器执行加一操作。当计数器的值达到了设置的初始值时等待状态的线程会被唤醒继续执行。通过调用CyclicBarrier
对象的await()
方法,两个线程可以实现互相等待。一旦 N 个线程在等待CyclicBarrier
达成,所有线程将被释放掉去继续执行。由于CyclicBarrier
在释放等待线程后可以重用,所以可以称之为循环栅栏。
使用场景
CyclicBarrier
特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。
主要接口分析
CyclicBarrier
并没有自己去实现 AQS 框架的 API,而是利用了 ReentrantLock
和 Condition
。CyclicBarrier
提供的关键方法如下:
await()
等待其它参与方的到来(调用await()
)。如果当前调用是最后一个调用,则唤醒所有其它的线程的等待并且如果在构造CyclicBarrier
时指定了action
,当前线程会去执行该action
,然后该方法返回该线程调用 await 的次序(getParties()-1
说明该线程是第一个调用await
的,0
说明该线程是最后一个执行await
的),接着该线程继续执行await
后的代码;如果该调用不是最后一个调用,则阻塞等待;如果等待过程中,当前线程被中断,则抛出InterruptedException
;如果等待过程中,其它等待的线程被中断,或者其它线程等待超时,或者该barrier
被reset
,或者当前线程在执行barrier
构造时注册的action
时因为抛出异常而失败,则抛出BrokenBarrierException
。await(long timeout, TimeUnit unit)
与await()
唯一的不同点在于设置了等待超时时间,等待超时时会抛出TimeoutException
。reset()
该方法会将该barrier
重置为它的初始状态,并使得所有对该barrier
的await
调用抛出BrokenBarrierException
。
CyclicBarrier
提供的两个构造函数:
CyclicBarrier(int parties)
:parties
表示拦截线程的数量。创建一个新的CyclicBarrier
,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动barrier
时执行预定义的操作。CyclicBarrier(int parties, Runnable barrierAction)
:barrierAction
为CyclicBarrier
接收的Runnable
命令,用于在线程到达屏障时,优先执行barrierAction
,用于处理更加复杂的业务场景。创建一个新的CyclicBarrier
,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动barrier
时执行给定的屏障操作,该操作由最后一个进入barrier
的线程执行。Coding 演示
(1)简单例子
public class Solver {
private static final Integer WORK_COUNT = 10;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) {
// 初始化计数器为 10 的 CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(WORK_COUNT);
for (int i = 0; i < WORK_COUNT; i++) {
executorService.execute(new Worker(cyclicBarrier));
}
// 平滑地关闭 ExecutorService
executorService.shutdown();
}
}
class Worker implements Runnable {
private final CyclicBarrier cyclicBarrier;
Worker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.print("before..");
try {
// 多个线程之间相互等待时,只有当每个线程都准备就绪后才能各自继续执行后面的操作
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
// 子线程执行
doWork();
}
private void doWork() {
// ...
System.out.print("after..");
}
}
// Output
// before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
(2)执行
barrierAction
在
ready
状态时日志是每秒输出一条,当有 5 条ready
时会一次性输出 5 条continue
。这就是前面讲的全部线程准备就绪后同时开始执行。在初始化CyclicBarrier
时还可以在等待线程数后指定一个runnable
,含义是当线程到达这个屏障时优先执行这里的runnable
。public class Solver {
private static final Integer WORK_COUNT = 10;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) throws InterruptedException {
// 初始化计数器为 5 的 CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println(String.format("%s call back is ready.", Thread.currentThread().getName())));
for (int i = 0; i < WORK_COUNT; i++) {
Thread.sleep(1000);
executorService.execute(new Worker(cyclicBarrier));
}
// 平滑地关闭 ExecutorService
executorService.shutdown();
}
}
class Worker implements Runnable {
private final CyclicBarrier cyclicBarrier;
Worker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println(String.format("%s is ready", Thread.currentThread().getName()));
cyclicBarrier.await();
System.out.println(String.format("%s continue", Thread.currentThread().getName()));
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
// Output
// pool-1-thread-1 is ready
// pool-1-thread-2 is ready
// pool-1-thread-3 is ready
// pool-1-thread-4 is ready
// pool-1-thread-5 is ready
// pool-1-thread-5 call back is ready.
// pool-1-thread-5 continue
// pool-1-thread-3 continue
// pool-1-thread-4 continue
// pool-1-thread-1 continue
// pool-1-thread-2 continue
// pool-1-thread-5 is ready
// pool-1-thread-1 is ready
// pool-1-thread-4 is ready
// pool-1-thread-2 is ready
// pool-1-thread-3 is ready
// pool-1-thread-3 call back is ready.
// pool-1-thread-3 continue
// pool-1-thread-5 continue
// pool-1-thread-4 continue
// pool-1-thread-2 continue
// pool-1-thread-1 continue
CyclicBarrier
与CountDownLatch
区别CyclicBarrier
与CountDownLatch
可能容易混淆,强调下其区别:CountDownLatch
的参与线程是有不同角色的,有的负责倒计时,有的在等待倒计时变为 0,负责倒计时和等待倒计时的线程都可以有多个,它用于不同角色线程间的同步。CyclicBarrier
的参与线程角色是一样的,用于同一角色线程间的协调一致。CountDownLatch
是一次性的,而CyclicBarrier
是可以重复利用的。3、
Semaphore
- 信号量简介
Semaphore
,又名信号量,这个类的作用有点类似于 “许可证”。信号量Semaphore
是一个控制访问多个共享资源的计数器,和CountDownLatch
一样,其本质上是一个 “共享锁”。从源码角度来看,Semaphore 的实现方式和CountDownLatch
非常相似,基于 AQS 做了一些定制。通过维持 AQS 的锁全局计数 state 字段来实现定量锁的加锁和解锁操作。Semaphore
通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
有时,因为一些原因需要控制同时访问共享资源的最大线程数量,比如出于系统性能的考虑需要限流,或者共享资源是稀缺资源,需要有一种办法能够协调各个线程,以保证合理的使用公共资源。当有线程想要访问共享资源时,需要先获取 (acquire
) 的许可;如果许可不够了,线程需要一直等待,直到许可可用。当线程使用完共享资源后,可以归还 (release
) 许可,以供其它需要的线程使用;然而,实际上并没有真实的许可证对象供线程使用,Semaphore 只是对可用的数量进行管理维护。
使用场景
Semaphore
可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。主要接口分析
Semaphore
内部包含公平锁(FairSync
)和非公平锁(NonfairSync
),继承内部类Sync
,其中Sync
继承 AQS,作为Semaphore
的公平锁和非公平锁的基类。CyclicBarrier
提供的关键方法如下:isFair()
:是否公平模式 FIFOavailablePermits()
:获取当前可用的许可证数量acquire()
:当前线程尝试去阻塞的获取 1 个许可证。此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:当前线程获取了 1 个可用的许可证,则会停止等待,继续执行;当前线程被中断,则会抛出InterruptedException
异常,并停止等待,继续执行。acquire(permits)
:当前线程尝试去阻塞的获取 permits 个许可证。此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:当前线程获取了 n 个可用的许可证,则会停止等待,继续执行;当前线程被中断,则会抛出InterruptedException
异常,并停止等待,继续执行。acquierUninterruptibly()
:当前线程尝试去阻塞的获取 1 个许可证 (不可中断的)。此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:当前线程获取了 1 个可用的许可证,则会停止等待,继续执行。acquireUninterruptibly(permits)
:当前线程尝试去阻塞的获取 permits 个许可证。此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:当前线程获取了 n 个可用的许可证,则会停止等待,继续执行。tryAcquire()
:当前线程尝试去获取 1 个许可证。此过程是非阻塞的,它只是在方法调用时进行一次尝试。如果当前线程获取了 1 个可用的许可证,则会停止等待,继续执行,并返回true
。如果当前线程没有获得这个许可证,也会停止等待,继续执行,并返回false
。tryAcquire(permits)
:当前线程尝试去获取permits
个许可证。此过程是非阻塞的,它只是在方法调用时进行一次尝试。如果当前线程获取了 permits 个可用的许可证,则会停止等待,继续执行,并返回true
。如果当前线程没有获得permits
个许可证,也会停止等待,继续执行,并返回false
。tryAcquire(timeout, TimeUnit)
:当前线程在限定时间内,阻塞的尝试去获取 1 个许可证。此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:当前线程获取了可用的许可证,则会停止等待,继续执行,并返回 true;当前线程等待时间timeout
超时,则会停止等待,继续执行,并返回false
;当前线程在timeout
时间内被中断,则会抛出InterruptedException
一次,并停止等待,继续执行。tryAcquire(permits, timeout, TimeUnit)
:当前线程在限定时间内,阻塞的尝试去获取permits
个许可证。此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:当前线程获取了可用的permits
个许可证,则会停止等待,继续执行,并返回true
;当前线程等待时间timeout
超时,则会停止等待,继续执行,并返回false
;当前线程在timeout
时间内被中断,则会抛出InterruptedException
一次,并停止等待,继续执行。release()
:当前线程释放 1 个可用的许可证。release(permits)
:当前线程释放 permits 个可用的许可证。drainPermits()
:当前线程获得剩余的所有可用许可证。hasQueuedThreads()
:判断当前Semaphore
对象上是否存在正在等待许可证的线程。getQueueLength()
:获取当前Semaphore
对象上是正在等待许可证的线程数量。
Semaphore
提供了两个构造函数:
Semaphore(int permits)
:创建具有给定的许可数和非公平的公平设置的Semaphore
,Semaphore
默认选择非公平锁。Semaphore(int permits, boolean fair)
:创建具有给定的许可数和给定的公平设置的Semaphore
。Semaphore
有两种模式,公平模式和非公平模式。公平模式就是调用acquire
的顺序就是获取许可证的顺序,遵循 FIFO;而非公平模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程,简单的说就是随机选取新线程来运行。Coding 演示
public class SemaphoreExample {
public static void main(String[] args) {
final int clientCount = 3;
final int totalRequestCount = 10;
Semaphore semaphore = new Semaphore(clientCount);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
executorService.execute(()->{
try {
semaphore.acquire();
System.out.print(semaphore.availablePermits() + " ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
}
executorService.shutdown();
}
}
// Output
// 2 1 2 2 2 2 2 1 2 2
4、Exchanger - 交换器
简介
Exchanger(交换器)是一个用于线程间协作的工具类,是 JDK 1.5 开始提供的一个用于两个工作线程之间交换数据的封装工具类。Exchanger 有点类似于 CyclicBarrier ,CyclicBarrier 是一个栅栏,到达栅栏的线程需要等待其它一定数量的线程到达后,才能通过栅栏,Exchanger 可以看成是一个双向栅栏。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。
可简单地将 Exchanger 对象理解为一个包含两个格子的容器,通过 exchanger 方法可以向两个格子中填充信息。当两个格子中的均被填充时,该对象会自动将两个格子的信息交换,然后返回给线程,从而实现两个线程的信息交换。这两个线程通过 exchange 方法交换数据,如果第一个线程先执行 exchange 方法,它会一直等待第二个线程也执行 exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
主要接口分析
Exchanger
是最简单的也是最复杂的,简单在于 API 非常简单,就一个构造方法和两个exchange()
方法,最复杂在于它的实现是最复杂的。Exchanger
提供的关键方法如下:exchange(V x)
:当前线程跟另外一个线程交换数据x
,如果另外一个线程的数据准备好,那么当前线程会立刻返回,并获得另外一个线程的数据;否则当前线程会进入等待状态。V exchange(V x, long timeout, TimeUnit unit)
:当前线程跟另外一个线程交换数据x
,有一个指定的超时时间,如果在等待时间超时了,而且还没有收到对方的数据的话,则会抛出TimeoutException
异常。
可以看出,当一个线程到达 exchange
调用点时,如果其他线程此前已经调用了此方法,则其他线程会被调度唤醒并与之进行对象交换,然后各自返回;如果其他线程还没到达交换点,则当前线程会被挂起,直至其他线程到达才会完成交换并正常返回,或者当前线程被中断或超时返回。
Coding 演示
public class ExchangerExample {
private static final Integer WORK_COUNT = 2;
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < WORK_COUNT; i++) {
executorService.execute(() -> {
String beforeObj = Thread.currentThread().getName();
try {
String afterObj = exchanger.exchange(Thread.currentThread().getName());
System.out.println(String.format("currentThread %s , before exchange %s , after exchange %s", Thread.currentThread().getName(), beforeObj, afterObj));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
// Output
// currentThread pool-1-thread-1 , before exchange pool-1-thread-1 , after exchange pool-1-thread-2
// currentThread pool-1-thread-2 , before exchange pool-1-thread-2 , after exchange pool-1-thread-1
5、Phaser
- 多阶段栅栏
简介
CountDownLatch
和 CyclicBarrier
都是 JDK 1.5 引入的,而 Phaser
是 JDK 1.7 引入的。Phaser
的功能与 CountDownLatch
和 CyclicBarrier
有部分重叠,它几乎可以取代 CountDownLatch
和 CyclicBarrier
, 其功能更灵活,更强大,支持动态调整需要控制的线程数。CountDownLatch
,闭锁,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待,它提供了 await()
、countDown()
两个方法来进行操作;CyclicBarrier
,循环栅栏,允许一组线程互相等待,直到到达某个公共屏障点,它提供的 await()
可以实现让所有参与者在临界点到来之前一直处于等待状态;Phaser,多阶段栅栏,它把多个线程协作执行的任务划分为多个阶段,编程时需要明确各个阶段的任务,每个阶段都可以有任意个参与者,线程都可以随时注册并参与到某个阶段,当到达的参与者数量满足栅栏设定的数量后,会进行阶段升级(advance)。Phaser
顾名思义,与阶段相关。Phaser
比较适合这样一种场景,一种任务可以分为多个阶段,现希望多个线程去处理该批任务,对于每个阶段,多个线程可以并发进行,但是希望保证只有前面一个阶段的任务完成之后才能开始后面的任务。这种场景可以使用多个 CyclicBarrier
来实现,每个 CyclicBarrier
负责等待一个阶段的任务全部完成。但是使用 CyclicBarrier
的缺点在于,需要明确知道总共有多少个阶段,同时并行的任务数需要提前预定义好,且无法动态修改。而 Phaser
可同时解决这两个问题。
使用场景
主要接口分析
Phaser
提供的关键方法如下:
arriveAndAwaitAdvance()
:当前线程当前阶段执行完毕,等待其它线程完成当前阶段。如果当前线程是该阶段最后一个未到达的,则该方法直接返回下一个阶段的序号(阶段序号从 0 开始),同时其它线程的该方法也返回下一个阶段的序号。arriveAndAwaitAdvance
方法是不响应中断的,也就是说即使当前线程被中断,arriveAndAwaitAdvance
方法也不会返回或抛出异常,而是继续等待。如果希望能够响应中断,可以参考awaitAdvanceInterruptibly
方法。arriveAndDeregister()
:该方法立即返回下一阶段的序号,并且其它线程需要等待的个数减一,并且把当前线程从之后需要等待的成员中移除。如果该Phaser
是另外一个Phaser
的子Phaser
,并且该操作导致当前Phaser
的成员数为0
,则该操作也会将当前Phaser
从其父Phaser
中移除。arrive()
:该方法不作任何等待,直接返回下一阶段的序号。awaitAdvance(int phase)
:该方法等待某一阶段执行完毕。如果当前阶段不等于指定的阶段或者该Phaser
已经被终止,则立即返回。该阶段数一般由arrive()
方法或者arriveAndDeregister()
方法返回。返回下一阶段的序号,或者返回参数指定的值(如果该参数为负数),或者直接返回当前阶段序号(如果当前Phaser
已经被终止)。awaitAdvanceInterruptibly(int phase)
:效果与awaitAdvance(int phase)
相当,唯一的不同在于若该线程在该方法等待时被中断,则该方法抛出InterruptedException
。awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
:效果与awaitAdvanceInterruptibly(int phase)
相当,区别在于如果超时则抛出TimeoutException
。bulkRegister(int parties)
:注册多个party
。如果当前phaser
已经被终止,则该方法无效,并返回负数。如果调用该方法时,onAdvance
方法正在执行,则该方法等待其执行完毕。如果该Phaser
有父Phaser
则指定的party
数大于0
,且之前该Phaser
的party
数为0
,那么该Phaser
会被注册到其父Phaser
中。forceTermination()
:强制让该Phaser
进入终止状态。已经注册的 party 数不受影响。如果该Phaser
有子Phaser
,则其所有的子Phaser
均进入终止状态。如果该Phaser
已经处于终止状态,该方法调用不造成任何影响。Coding 演示
(1)通过
通过Phaser
实现CyclicBarrier
控制多个线程的执行时机的功能Phaser
控制多个线程的执行时机:有时候希望所有线程到达指定点后再同时开始执行,可以利用CyclicBarrier
来实现,这里给出使用Phaser
的版本。public class PhaserExample {
public static void main(String[] args) {
final int totalRequestCount = 10;
Phaser phaser = new Phaser();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
phaser.register();
executorService.execute(() -> {
// 等待其它参与者线程到达 [arriveAndAwaitAdvance 方法是不响应中断的,也就是说即使当前线程被中断,arriveAndAwaitAdvance 方法也不会返回或抛出异常,而是继续等待。如果希望能够响应中断,可以参考 awaitAdvanceInterruptibly 方法]
int j = phaser.arriveAndAwaitAdvance();
// do something
System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
});
}
executorService.shutdown();
}
}
// Output
// currentThread:pool-1-thread-1, Executing the task, currentPhase:1
// currentThread:pool-1-thread-2, Executing the task, currentPhase:1
// currentThread:pool-1-thread-7, Executing the task, currentPhase:1
// currentThread:pool-1-thread-4, Executing the task, currentPhase:1
// currentThread:pool-1-thread-6, Executing the task, currentPhase:1
// currentThread:pool-1-thread-3, Executing the task, currentPhase:1
// currentThread:pool-1-thread-10, Executing the task, currentPhase:1
// currentThread:pool-1-thread-9, Executing the task, currentPhase:1
// currentThread:pool-1-thread-5, Executing the task, currentPhase:1
// currentThread:pool-1-thread-8, Executing the task, currentPhase:1
(2)通过
Phaser
实现CyclicBarrier
执行barrierAction
CyclicBarrier
支持 barrier action,Phaser
同样也支持。不同之处是Phaser
的 barrier action 需要改写onAdvance
方法来进行定制。public class PhaserExample {
public static void main(String[] args) {
final int totalRequestCount = 10;
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(String.format("%s call back is ready.", Thread.currentThread().getName()));
return super.onAdvance(phase, registeredParties);
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
// 注册各个参与者线程
phaser.register();
executorService.execute(() -> {
// 等待其它参与者线程到达 [arriveAndAwaitAdvance 方法是不响应中断的,也就是说即使当前线程被中断,arriveAndAwaitAdvance 方法也不会返回或抛出异常,而是继续等待。如果希望能够响应中断,可以参考 awaitAdvanceInterruptibly 方法]
int j = phaser.arriveAndAwaitAdvance();
// do something
System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
});
}
executorService.shutdown();
}
}
// Output
// pool-1-thread-10 call back is ready.
// currentThread:pool-1-thread-10, Executing the task, currentPhase:1
// currentThread:pool-1-thread-9, Executing the task, currentPhase:1
// currentThread:pool-1-thread-7, Executing the task, currentPhase:1
// currentThread:pool-1-thread-8, Executing the task, currentPhase:1
// currentThread:pool-1-thread-5, Executing the task, currentPhase:1
// currentThread:pool-1-thread-3, Executing the task, currentPhase:1
// currentThread:pool-1-thread-1, Executing the task, currentPhase:1
// currentThread:pool-1-thread-6, Executing the task, currentPhase:1
// currentThread:pool-1-thread-2, Executing the task, currentPhase:1
// currentThread:pool-1-thread-4, Executing the task, currentPhase:1
(3)通过
Phaser
实现CountDownLatch
作为一个开关 / 入口功能public class PhaserExample {
public static void main(String[] args) throws IOException {
final int totalRequestCount = 10;
// 注册主线程, 当外部条件满足时, 由主线程打开开关
Phaser phaser = new Phaser(1);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
// 注册各个参与者线程
phaser.register();
executorService.execute(() -> {
// 等待其它参与者线程到达 [arriveAndAwaitAdvance 方法是不响应中断的,也就是说即使当前线程被中断,arriveAndAwaitAdvance 方法也不会返回或抛出异常,而是继续等待。如果希望能够响应中断,可以参考 awaitAdvanceInterruptibly 方法]
int j = phaser.arriveAndAwaitAdvance();
// do something
System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
});
}
// 打开开关 [parties 共 11 个, 主线程从之后需要等待的成员中移除, 即 parties 还剩 10]
phaser.arriveAndDeregister();
System.out.println("主线程打开了开关");
executorService.shutdown();
}
}
// Output
// 主线程打开了开关
// currentThread:pool-1-thread-6, Executing the task, currentPhase:1
// currentThread:pool-1-thread-7, Executing the task, currentPhase:1
// currentThread:pool-1-thread-1, Executing the task, currentPhase:1
// currentThread:pool-1-thread-8, Executing the task, currentPhase:1
// currentThread:pool-1-thread-3, Executing the task, currentPhase:1
// currentThread:pool-1-thread-9, Executing the task, currentPhase:1
// currentThread:pool-1-thread-4, Executing the task, currentPhase:1
// currentThread:pool-1-thread-2, Executing the task, currentPhase:1
// currentThread:pool-1-thread-5, Executing the task, currentPhase:1
(4)通过
Phaser
实现分层public class PhaserExample {
public static void main(String[] args) {
final int parties = 3;
final int phases = 4;
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("====== Phase :" + phase + "======");
return super.onAdvance(phase, registeredParties);
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < parties; i++) {
// 注册各个参与者线程
phaser.register();
executorService.execute(() -> {
for (int phase = 0; phase < phases; phase++) {
// 等待其它参与者线程到达 [arriveAndAwaitAdvance 方法是不响应中断的,也就是说即使当前线程被中断,arriveAndAwaitAdvance 方法也不会返回或抛出异常,而是继续等待。如果希望能够响应中断,可以参考 awaitAdvanceInterruptibly 方法]
int j = phaser.arriveAndAwaitAdvance();
// do something
System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
}
});
}
executorService.shutdown();
}
}
// Output
// ====== Phase : 0 ======
// currentThread:pool-1-thread-1, Executing the task, currentPhase:1
// currentThread:pool-1-thread-2, Executing the task, currentPhase:1
// currentThread:pool-1-thread-3, Executing the task, currentPhase:1
// ====== Phase : 1 ======
// currentThread:pool-1-thread-3, Executing the task, currentPhase:2
// currentThread:pool-1-thread-1, Executing the task, currentPhase:2
// currentThread:pool-1-thread-2, Executing the task, currentPhase:2
// ====== Phase : 2 ======
// currentThread:pool-1-thread-2, Executing the task, currentPhase:3
// currentThread:pool-1-thread-1, Executing the task, currentPhase:3
// currentThread:pool-1-thread-3, Executing the task, currentPhase:3
// ====== Phase : 3 ======
// currentThread:pool-1-thread-3, Executing the task, currentPhase:4
// currentThread:pool-1-thread-1, Executing the task, currentPhase:4
// currentThread:pool-1-thread-2, Executing the task, currentPhase:4