CountDownLatch
CountDownLatch是一个同步功能的辅助类,其效果是给定一个计数(count),当使用这个CountDownLatch类的线程判断计数不为0时,则处于wait阻塞状态,如果为0则继续执行。
实现等待与继续运行的效果分别需要使用await()和countDown()方法来进行。调用await()方法时判断当前计数count是否为0,不为0则阻塞等待;其他线程调用countDown()方法将当前计数减1,当减到0是等到线程运行。
CountDownLatch也是使用AQS实现,AQS state值就是当前计数count
public class CountDownLatch {
// 同步控制的计数锁。使用AQS表示计数。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count); // state表示当前计数
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
/**
* 用给定的计数构造一个countdownlock。
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* 导致当前线程等待,直到锁存数被计算为零,除非线程被中断。
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 导致当前线程等待,直到存锁被计算为零,除非线程被中断,或者指定的等待时间流逝。
* 如果当前计数为零,那么该方法将立即返回true。
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
*如果计数达到零,释放所有等待线程的计数。
*如果电流计数大于零,那么它就会递减。
*如果新计数为零,那么所有等待线程都重新启用以实现线程调度。
*如果当前计数等于0,那么什么都没有发生。
*/
public void countDown() {
sync.releaseShared(1);
}
/**
* 获取当前计数
*/
public long getCount() {
return sync.getCount();
}
}
public class Code01_CountDownLatchDemo {
private CountDownLatch countDownLatch = new CountDownLatch(1);
public void workStart() {
System.out.println("test begin");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("test end");
}
public void workDown() {
System.out.println("work over");
countDownLatch.countDown();
}
public static void main(String[] args) {
Code01_CountDownLatchDemo t = new Code01_CountDownLatchDemo();
new Thread(t::workStart).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(t::workDown).start();
}
}
CyclicBarrier
CyclicBarrier 不仅具有CountdownLatch全部功能,还可以实现阶段性同步功能,其使用意义在于可以循环地实现线程要一起做任务的目标,而CountDownLatch仅仅支持一次线程与同步点阻塞。
CyclicBarrier允许一组线程互相等待,直到某个公共屏障点(common barrier point),这些线程必须实时互相等待,可使用CyclicBarrier实现。
CyclicBarrier 底层是使用ReentrantLock + Condition 实现,ReenTrantLcok底层是AQS实现。
/** ReentrantLock */
private final ReentrantLock lock = new ReentrantLock();
/** Condition 等待队列 */
private final Condition trip = lock.newCondition();
/** 一组参与者的数量(需要等待的数量) */
private final int parties;
/* 参与者到齐后的回调 */
private final Runnable barrierCommand;
/** 当前这一组*/
private Generation generation = new Generation();
/** 还需要等待的数量 */
private int count;
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
await方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
ClyclicBarrier#await 方法都是调用了 dowait方法,dowait方法实现:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 实例的ReentrantLock
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 当前的这一组
final Generation g = generation;
// 当前这一组Barrier是否broken(前面有线程被interttupt)
if (g.broken)
// 抛出异常
throw new BrokenBarrierException();
// 当前线程是否被中断
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 编号为cout-1
int index = --count;
// 为最后一个需要等待的线程
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 等齐的后执行的线程
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 执行结束开启下一代(下一组等待)
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await(); // 添加到condition等待队列
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
// 如果线程被interrupt,则开启新的一代,当前代broken==true,抛出异常
breakBarrier();
throw ie;
} else {
// 当前代被被破坏了
Thread.currentThread().interrupt();
}
}
// 如果当前组被破坏
if (g.broken)
throw new BrokenBarrierException();
// 如果不是当前这一代,返回index
if (g != generation)
return index;
// 如果等待时间到了
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll(); // 唤醒全部等待线程,开启新的一代,重新await
}
reset
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation // 开始新的一代
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); // 唤醒等待队列全部线程
// set up next generation
count = parties;
generation = new Generation(); // 重新一代
}
Semaphore
Swmaphore主要是用于限制并发线程数量,是基于AQS实现。有公平实现和非公平实现。AQS state值表示可用的权限数量。
public Semaphore(int permits) {
sync = new NonfairSync(permits); // 调用非公平实现
}
// true 调用公平实现,否则调用非公平实现
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
公平实现、非公平实现都是调用Sync
FairSync(int permits) {
super(permits);
}
NonfairSync(int permits) {
super(permits);
}
Sync(int permits) {
setState(permits); // state值表示权限可用数量
}
acquire
等待获取许可,阻塞直到需要的数量许可可用或者线程被中断。acquire() 默认需要1个许可,acquire(int permits)可指定需要许可数量。底层调用AQS#acquireSharedInterruptibly
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // tryAcquireShared 由子类实现
doAcquireSharedInterruptibly(arg); // 如果剩余数量小于0 ,调用AQS#doAcquireSharedInterruptibly
}
非公平Semaphore#tryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires); // 调用Sync#nonfairTryAcquireShared
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 当前state量
int available = getState();
// 剩余权限数量
int remaining = available - acquires;
if (remaining < 0 || // 剩余权限数量小于0
compareAndSetState(available, remaining)) // CAS获取权限
return remaining; // 返回剩余数量
}
}
公平Semaphore#tryAcquireShared
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors()) // ASQ#hasQueuedPredecessors AQS阻塞等待有一个头结点
return -1;
// 当前线程位于AQS等待队列头部
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
acquireUninterruptibly
acquireUninterruptibly,阻塞等待获取许可,直到获取到许可,与acquire相比,
acquireUninterruptibly不可被中断。
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 子类实现tryAcquireShared
doAcquireShared(arg);
}
tryAcquire
tryAcquire() 直接调用sync.nonfairTryAcquireShared(1),不会被阻塞。
tryAcquire(long timeout, TimeUnit unit),调用AQS#tryAcquireSharedNanos,阻塞,获取到许可或者等待时间后自动返回。
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
release
释放许,底层是AQS#releaseShared是按
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // tryRelease 由子类实现
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 当前state值
int current = getState();
// 归还权限后的state值
int next = current + releases;
// 超出权限数量最大限制
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // CAS操作归还权限
return true;
}
}
Exchanger
Exchanger 的功能是使2个线程之间传输数据。
public class Code02_ExchangerDemo01 {
@SuppressWarnings("all")
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread thead1 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "获取到线程2的值: " + exchanger.exchange("线程1的值:ONE"));
System.out.println("线程1结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程1");
Thread thead2 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "获取到线程1的值: " + exchanger.exchange("线程2的值:TWO"));
System.out.println("线程2结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程2");
thead1.start();
thead2.start();
}
}
Phaser
一个可重用的同步屏障,在功能上类似于CyclicBarrier和CountDownLatch,但支持更灵活的使用。
Registration: 与CountDonwnLatch、CyclicBarrier不同,Phaser注册的同步参与者数量(parties)可以随时间变化(可使用register、bulkRegister、构造函数初始化参与者数量(parties的值),可以使用arriveAndDeregister注销)。 Synchronization:与CyclicBarrier一样,Phaser也可以被反复等待。方法arriveAndAwaitAdvance的作用类似于CyclicBarrier.await。Phaser每一代都有一个phase number,从0开始,所有参与者到达时加1,达到Integer.MAX_VALUE时又回到0。 Arrival:方法arrive和arriveAndDeregister记录到达者。这两个方法非阻塞,只返回到达的phaser 的phase number。当给定阶段的最后一方到达时,将执行一个可选操作,并推进该阶段。 Waiting:方法awaitAdvance需要一个参数来指示到达阶段号,并在相位器前进到(或已经处于)不同阶段时返回,该方法不可被中断。 Termination:Phaser可以进入终止状态,可以通过方法isTerminated方法检查状态。当进入终止状态,所有同步方法立即返回。当调用onAdvance返回true时触发终止。 Tiering:相位器可以分层以减少争用。具有大量参与方的相位器可能会经历大量的同步争用成本,因此可以将其设置为一组子相位器共享一个公共父节点。这可能会大大增加吞吐量,尽管它会导致更大的操作开销。 Monitoring:虽然同步方法只能由已注册的参与者调用,但是相位器的当前状态可以由任何调用方监视。在任何给定时刻,getRegisteredParties返回总共参与者的数量,其中getArrivedParties已经到达当前阶段(getPhase)的参与者。当剩余的(getUnarrivedParties)方到达时,阶段向前推进。
/**
state值的
0-15位:表示unarrived
16-31位:表示wait
32-62位:表示terminated
63位:符号位
*/
private volatile long state;
- arriveAndAwaitAnvance 到达屏障处等待
- arriveAndDeregister 到达屏障出取消参与,parties数减1
- getPhase 获取已经到达第几个屏障
- onAdvance 通过新的屏障是被调用
- getRegisteredParties 获得注册的parties数量
- register 动态地添加一个parties的值
- bulkRegister 批量增加partise的值
- getArrivedParties 获取已经到达的parties数
- getUnarrivedParties 获取没有到达的parties数
- arrive 是到达的parties的值加1,并且不在屏障处等待,继续执行
- awaitAdvance(int phase) 在第几个屏障处等待(如果getPhase方法返回的值等于传入的参数,则在屏障处等待,否则继续运行
- awaitAdvanceInterruptibly(int) 是可被中断的,而arriveAndAwaitAnvance 是不可被中断的
- forceTermination 是phaser屏障功能失效
- isTerminated 判断phaser对象是否是销毁状态