1. 概述
主要介绍一下使用AQS实现的并发工具类,包括以下几个:
- CountDownLatch
- CyclicBarrier
- Semaphore
- Exchanger
2. CountDownLatch
CountDownLatch,翻译一下,倒数门闩,再翻译一下,倒数结束,门闩才会打开。该工具类主要用于让一个或者多个线程等待其他线程结束。CountDownLatch主要有三个方法,如下所示:
public CountDownLatch(int count) 构造方法,指定倒数的数量,该数量不可被重置。public void countDown() 倒数,即倒数的数量-1public void await() 阻塞等待倒数结束
2.1 使用
以下为CountDownLatch的使用代码,main线程在调用了await方法之后会被阻塞,等待其他线程调用countDown方法使计数器减一,直至减到0。
@Test@SuppressWarnings("all")public void testCountDownLatch() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(4);for (int i = 0; i < 4; i++) {new Thread(() -> {try {// doBusinessThread.sleep(2000);System.out.println(Thread.currentThread().getName() + " 释放");countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}).start();}System.out.println(Thread.currentThread().getName() + " 阻塞等待其他线程");countDownLatch.await();System.out.println(Thread.currentThread().getName() + " 阻塞结束");}
运行结果:
main 阻塞等待其他线程Thread-1 释放Thread-3 释放Thread-2 释放Thread-0 释放main 阻塞结束
2.2 实现
CountDownLatch的实现主要依赖其静态内部类Sync,该类继承了AQS并且实现了共享模式。代码如下所示:
private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;/*** 设置同步状态*/Sync(int count) {setState(count);}/*** 获取同步状态*/int getCount() {return getState();}/*** 获取同步状态,即CountDownLatch的await方法,当当前同步状态不为0时,阻塞等待*/protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}/*** 释放同步状态,调用countDown方法时,会将同步状态减一* 当同步状态==0时,该方法返回true,AQS会唤醒等待队列的后继线程(即调用CountDownLatch.await方法而阻塞的线程)*/protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}
3. CyclicBarrier
CyclicBarrier,翻译一下,可循环使用的屏障。该工具类主要做的事情:让线程到达屏障时(即调用await方法)被阻塞,直到最后一个线程到达屏障,屏障才会被打开,被阻塞的线程才会继续运行。
ps:屏障拦截的线程数量可被重置
CyclicBarrier主要提供了以下方法:
public CyclicBarrier(int parties) 构造方法,指定屏障拦截的线程数量public CyclicBarrierNote(int parties, Runnable barrierAction)构造方法,指定屏障拦截的线程数量。最后一个到达屏障的线程会先执行barrierAction的逻辑,再继续自己的工作。public int await() 到达屏障,阻塞等待其他线程到达屏障
3.1 使用
@Testpublic void testCyclicBarrier() throws BrokenBarrierException, InterruptedException {CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {System.out.println(Thread.currentThread().getName() + " 执行barrierAction");});new Thread(() -> {try {Thread.sleep(1000);System.out.println(Thread.currentThread().getName() + " 到达屏障");cyclicBarrier.await();System.out.println(Thread.currentThread().getName() + " 执行业务");} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}).start();System.out.println(Thread.currentThread().getName() + " 到达屏障");cyclicBarrier.await();System.out.println("屏障结束");}
运行结果:
main 到达屏障Thread-0 到达屏障Thread-0 执行barrierActionThread-0 执行业务屏障结束
3.2 实现
CyclicBarrier的实现主要依赖于ReentrantLock以及Condition来实现的,具体看一下dowait方法,代码如下所示:
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;// 判断当前是否被打破了if (g.broken)throw new BrokenBarrierException();// 判断当前是否被中断,被中断则打破屏障if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}int index = --count;// 如果是最后一个到达的线程,唤醒所有线程if (index == 0) { // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;// 产生新一代,唤醒所有线程nextGeneration();return 0;} finally {// 执行barrierCommand.run失败了,打破屏障if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed out// 如果不是最后一个到达的线程,循环等待for (;;) {try {// 等待 和 超时等待 两种情况if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {// g == generation && ! g.broken 说明此时当前代没有被别的线程打破屏障if (g == generation && ! g.broken) {breakBarrier();throw ie;}// 当前线程被中断了,但是此时应该是别的线程打破了屏障else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}// 别的线程打破了屏障,抛出异常if (g.broken)throw new BrokenBarrierException();// 代 已经被别的线程更新if (g != generation)return index;// 等待超时了,打破屏障if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}
4. Semaphore
Semaphore,翻译一下,信号量。Semaphore是用来控制同时访问特定资源的线程数量。举个🌰,卫生间有三个位置,决定当前仅有三个人能进去,其他人则会在门口进行排队,只有当里面的人出来,外面的人才能够进去。
Semaphore 支持公平和非公平模式,默认非公平模式。
- 公平模式无论是否有许可,都会判断是否线程在排队,如果有线程排队,获取线程立即失败,进入排队;
- 非公平模式无论许可是否充足,直接尝试获取许可。
Semaphore主要有以下四个方法:
public Semaphore(int permits) 定义可以访问共享资源的许可证数量public void acquire() 请求获取访问许可public void release() 是否访问许可
4.1 使用
@Test@SuppressWarnings("all")public void testSemaphore() throws Exception{Semaphore restRoom = new Semaphore(2);for (int i = 0; i < 3; i++) {Thread thread = new Thread(() -> {try {System.out.println(Thread.currentThread().getId() + "想上厕所");Thread.sleep(500);restRoom.acquire(1);System.out.println(Thread.currentThread().getId() + "进入厕所");Thread.sleep(new Random().nextInt(2000));System.out.println(Thread.currentThread().getId() + "上完厕所");} catch (InterruptedException e) {e.printStackTrace();} finally {restRoom.release();}});thread.start();}Thread.sleep(10000);}
运行结果:
10想上厕所11想上厕所12想上厕所11进入厕所10进入厕所11上完厕所12进入厕所12上完厕所10上完厕所
4.2 实现
Semaphore的实现主要依赖于其内部类Sync,具体代码如下所示:
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;/*** 构造方法,设置总许可数量*/Sync(int permits) {setState(permits);}/*** 获取剩余许可数量*/final int getPermits() {return getState();}/*** 非公平获取许可*/final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}/*** 自旋,释放许可*/protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}/*** 自旋,减少许可数量*/final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;}}/*** 丢弃所有许可*/final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}}/*** 非公平模式*/static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}/*** 公平模式*/static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {// 公平非公平的区别主要就在这,公平模式时会判断当前队列是否有线程在排队,有就获取失败if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}
