一、应用
与CountDownLatch类似,也是基于AQS,实现线程等待,达到指定线程数量后,线程被唤醒继续执行。
车站流水发车场景:
CyclicBarrier认为是一个车站,每辆车可乘坐人数为CyclicBarrier的阈值;把乘客认为是线程。
未达到阈值前,所有乘客在车中等待(线程阻塞)。
当线程达到阈值,则进行发车(即,唤醒线程继续执行)。然后阈值重置,重新等待到来线程达到阈值。
应用示例
//统计工厂员工平均工作量
static ThreadPoolExecutor executor = new ThreadPoolExecutor(
3,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
(ThreadFactory) Thread::new);
private Map<String, Integer> workCount = new ConcurrentHashMap<>();
public static void main(String[] args) {
new CyclicBarrierDemo().count();
}
private CyclicBarrier cb = new CyclicBarrier(3, () -> {
int result = 0;
for (String key : workCount.keySet()) {
result += workCount.get(key);
}
System.out.println("工厂工人平均工作量为:" + (result / 3) );
});
private void count() {
for (int i = 0; i < 5; i++) {
executor.execute(() -> {
//统计工人工作量
int count = (int) (Math.random() * 40 + 60);
workCount.put(Thread.currentThread().getName(), count);
System.out.println(Thread.currentThread().getName()
+ "工人工作量:" + count);
try {
//执行完运行await(),等待所有学生平均成绩都计算完毕
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
Thread-2工人工作量:92
Thread-0工人工作量:89
Thread-1工人工作量:95
工厂工人平均工作量为:92
二、原理分析
await()方法
/**
*
* 如果当前线程不是最后一个到达的线程,则出于线程调度目的,将其休眠,
*直到发生以下情况之一:
* 1.最后一个线程到达;(正常情况)
* 2.其他线程中断当前线程;(当前线程抛出InterruptedException,并清除当前线程的中断状态;其他线程抛出BrokenBarrierException)
* 3.另一个线程中断另一个等待的线程;
* 4.其他线程在等待屏障时超时;(引发BrokenBarrierException)
* 5.其他一些线程在此屏障上调用重置。
**/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
dowait(false, 0L)核心逻辑
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//如果之前有线程被中断,则后续线程抛出BrokenBarrierException
if (g.broken)
throw new BrokenBarrierException();
//检查当前线程是否被中断。中断则抛出InterruptedException。并使后续线程抛出BrokenBarrierException
if (Thread.interrupted()) {
breakBarrier();//【1.】设置标识,使得后续线程抛出BrokenBarrierException;【2.】并唤醒所有已阻塞的线程
throw new InterruptedException();
}
//count记录阈值,每到一个线程则【自减一】
int index = --count;
//如果是最后一个线程,(如果有)则执行初始化时,设置的命令
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//唤醒所有阻塞线程。开始“新朝代”循环
nextGeneration();
return 0;
} finally {
//处理未知异常
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
/**
* 执行到此处的线程
* 1.最后一个线程
* 2.非最后一个(会被阻塞)
**/
for (;;) {
try {
if (!timed)
trip.await();//不是最后一个线程,则会被阻塞
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
//执行到此处线程。1.最后一个线程。2.重新被唤醒的线程
//最后一个线程会开始“新朝代”循环,则g != generation为true,跳出循环
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
nextGeneration()方法
/**
* 唤醒所有阻塞线程。开始“新朝代”循环
* private final Condition trip = lock.newCondition();
**/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
await()方法
AbstractQueuedSynchronizer。加入条件队列被阻塞
/**
* 1.将当前节点添加到条件队列中。
* 2.并释放已经获取的锁。(state变量归零,并唤醒因争抢锁被阻塞的线程)。
* 3.调用park方法,阻塞当前线程
**/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//添加的条件队列中(条件队列要素:firstWaiter、lastWaiter,条件队列中waitState=-2)
Node node = addConditionWaiter();
//释放已经持有的锁,并唤醒因争抢锁被阻塞的线程
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果不在等待队列中,则进行阻塞
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//线程被唤醒后的逻辑。重新获取锁(独占锁逻辑)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
signalAll()方法
AbstractQueuedSynchronizer。从条件队列中被唤醒
/**
*
* 1.将条件队列中元素,转移到等待队列中
* 2.并将条件队列中的waitstate。-2修改成0,最后修改成-1
**/
public final void signalAll() {
//如果当前线程,不是持有锁的线程,则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
//【循环】条件队列,转移到等待队列中
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
final boolean transferForSignal(Node node) {
/*
* 将-2修改为0
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* 1.将节点插入等待队列尾部
* 2.将0修改为-1(等待当前线程释放锁,唤醒等待队列中元素)
*
* 异常情况下,才立即唤醒
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}