一、应用
与CountDownLatch类似,也是基于AQS,实现线程等待,达到指定线程数量后,线程被唤醒继续执行。
车站流水发车场景:
CyclicBarrier认为是一个车站,每辆车可乘坐人数为CyclicBarrier的阈值;把乘客认为是线程。
未达到阈值前,所有乘客在车中等待(线程阻塞)。
当线程达到阈值,则进行发车(即,唤醒线程继续执行)。然后阈值重置,重新等待到来线程达到阈值。
应用示例
//统计工厂员工平均工作量static ThreadPoolExecutor executor = new ThreadPoolExecutor(3,5,100,TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),(ThreadFactory) Thread::new);private Map<String, Integer> workCount = new ConcurrentHashMap<>();public static void main(String[] args) {new CyclicBarrierDemo().count();}private CyclicBarrier cb = new CyclicBarrier(3, () -> {int result = 0;for (String key : workCount.keySet()) {result += workCount.get(key);}System.out.println("工厂工人平均工作量为:" + (result / 3) );});private void count() {for (int i = 0; i < 5; i++) {executor.execute(() -> {//统计工人工作量int count = (int) (Math.random() * 40 + 60);workCount.put(Thread.currentThread().getName(), count);System.out.println(Thread.currentThread().getName()+ "工人工作量:" + count);try {//执行完运行await(),等待所有学生平均成绩都计算完毕cb.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});}executor.shutdown();}}Thread-2工人工作量:92Thread-0工人工作量:89Thread-1工人工作量:95工厂工人平均工作量为:92
二、原理分析
await()方法
/**** 如果当前线程不是最后一个到达的线程,则出于线程调度目的,将其休眠,*直到发生以下情况之一:* 1.最后一个线程到达;(正常情况)* 2.其他线程中断当前线程;(当前线程抛出InterruptedException,并清除当前线程的中断状态;其他线程抛出BrokenBarrierException)* 3.另一个线程中断另一个等待的线程;* 4.其他线程在等待屏障时超时;(引发BrokenBarrierException)* 5.其他一些线程在此屏障上调用重置。**/public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}
dowait(false, 0L)核心逻辑
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;//如果之前有线程被中断,则后续线程抛出BrokenBarrierExceptionif (g.broken)throw new BrokenBarrierException();//检查当前线程是否被中断。中断则抛出InterruptedException。并使后续线程抛出BrokenBarrierExceptionif (Thread.interrupted()) {breakBarrier();//【1.】设置标识,使得后续线程抛出BrokenBarrierException;【2.】并唤醒所有已阻塞的线程throw new InterruptedException();}//count记录阈值,每到一个线程则【自减一】int index = --count;//如果是最后一个线程,(如果有)则执行初始化时,设置的命令if (index == 0) { // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;//唤醒所有阻塞线程。开始“新朝代”循环nextGeneration();return 0;} finally {//处理未知异常if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed out/*** 执行到此处的线程* 1.最后一个线程* 2.非最后一个(会被阻塞)**/for (;;) {try {if (!timed)trip.await();//不是最后一个线程,则会被阻塞else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();//执行到此处线程。1.最后一个线程。2.重新被唤醒的线程//最后一个线程会开始“新朝代”循环,则g != generation为true,跳出循环if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}
nextGeneration()方法
/*** 唤醒所有阻塞线程。开始“新朝代”循环* private final Condition trip = lock.newCondition();**/private void nextGeneration() {// signal completion of last generationtrip.signalAll();// set up next generationcount = parties;generation = new Generation();}
await()方法
AbstractQueuedSynchronizer。加入条件队列被阻塞
/*** 1.将当前节点添加到条件队列中。* 2.并释放已经获取的锁。(state变量归零,并唤醒因争抢锁被阻塞的线程)。* 3.调用park方法,阻塞当前线程**/public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//添加的条件队列中(条件队列要素:firstWaiter、lastWaiter,条件队列中waitState=-2)Node node = addConditionWaiter();//释放已经持有的锁,并唤醒因争抢锁被阻塞的线程int savedState = fullyRelease(node);int interruptMode = 0;//如果不在等待队列中,则进行阻塞while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}//线程被唤醒后的逻辑。重新获取锁(独占锁逻辑)if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
signalAll()方法
AbstractQueuedSynchronizer。从条件队列中被唤醒
/**** 1.将条件队列中元素,转移到等待队列中* 2.并将条件队列中的waitstate。-2修改成0,最后修改成-1**/public final void signalAll() {//如果当前线程,不是持有锁的线程,则抛出异常if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);}//【循环】条件队列,转移到等待队列中private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);}final boolean transferForSignal(Node node) {/** 将-2修改为0*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** 1.将节点插入等待队列尾部* 2.将0修改为-1(等待当前线程释放锁,唤醒等待队列中元素)** 异常情况下,才立即唤醒*/Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
