栅栏屏障,作用是让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。其实AQS里面另外一个工具 CountDownLantch 也可以实现该功能,只不过 CountDownLantch 的作用是一次性的,而 CyclicBarrier 可以重复起作用。
构造方法及成员属性
//锁资源private final ReentrantLock lock = new ReentrantLock();//用于创建条件等待队列private final Condition trip = lock.newCondition();//每次拦截的线程数private final int parties;//线程计数器,当线程数达到指定数量后统一放行private int count;//代表栅栏的当前代,每次count变为0,代表重新开始新一代,重新拦截线程private Generation generation = new Generation();//每次换代前执行的任务private final Runnable barrierCommand;
CyclicBarrier 内部是通过条件队列 trip 来对线程进行阻塞的,并且其内部维护了两个int型的变量 parties 和 count,parties表示每次拦截的线程数,该值在构造时进行赋值。count是内部计数器,它的初始值和parties相同,以后随着每次 await() 方法的调用而减1,直到减为0就将所有线程唤醒。
CyclicBarrier 有一个静态内部类 Generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,barrierCommand 表示换代前执行的任务,当 count 减为0时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定barrierCommand来执行自己的任务
public CyclicBarrier(int parties) {this(parties, null);}--------------------------------------public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}
await() 等待
等待有两种方式,一种是定时等待,另一种是非定时等待
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}---------------------------public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {return dowait(true, unit.toNanos(timeout));}
不管哪种方式最终都会走到 dowait() 方法,进行阻塞或放行
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {final ReentrantLock lock = this.lock;//获取锁lock.lock();try {final Generation g = generation;//如果当前栅栏别标记为broken,直接抛异常if (g.broken)throw new BrokenBarrierException();//如果当前线程发生中断,则会打破栅栏if (Thread.interrupted()) {//打破栅栏,标记为broken,重置count,释放条件等待队列中的线程breakBarrier();throw new InterruptedException();}//count减1int index = --count;//如果count=0,说明拦截线程数以满足,开始放行if (index == 0) {boolean ranAction = false;try {final Runnable command = barrierCommand;//执行自定义的换代任务if (command != null)command.run();ranAction = true;//使条件等待队列中的节点都出队进入到CLH队列,重置count,开启下一轮拦截nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}//count !=0,线程数还没到齐,走下面逻辑for (;;) {try {if (!timed)//调用 await()方法进入条件等待队列并进行阻塞trip.await();else if (nanos > 0L)//以超时的方式进入条件等待队列并进行阻塞nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;//以超时的方式进入条件等待队列,如果是被超时唤醒,打破栅栏并报异常if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}//释放锁} finally {lock.unlock();}}
阻塞等待线程
如果当前线程数未达到放行数量时,将count 减1,需要释放 lock 并且放入条件等待队列中进行阻塞
通过调用 Condition对象的 await() 方法加入条件等待队列,主要干三件事
- 将当前线程加入条件等待队列
- 释放当前线程获得的锁,并唤醒CLH队头节点
阻塞当前线程
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//创建节点并加入条件等待队列Node node = addConditionWaiter();//释放锁int savedState = fullyRelease(node);int interruptMode = 0;//如果当前节点不再CLH队列中while (!isOnSyncQueue(node)) {//将当前线程阻塞LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}//被唤醒后调用acquireQueued()继续尝试获取锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}

栅栏放行
当线程达到放行数量时,即count=0,调动 nextGeneration(),将条件等待队列中的线程转移到CLH 队列中,并且重置 count,开启下一轮拦截
private void nextGeneration() {//将条件等待队列中的线程放入到CLH队列中trip.signalAll();//重置count,开启下一轮拦截count = parties;generation = new Generation();}
将条件等待队列中的节点全部出队
public final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)//出队doSignalAll(first);}----------------------------private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;//通过while循环不断出队,知道头节点为nulldo {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);}----------------------------final boolean transferForSignal(Node node) {//将节点的状态由CONDITION设置为0if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//设置失败直接return falsereturn false;//将节点加入CLH队列,返回旧的tail节点Node p = enq(node);int ws = p.waitStatus;//如果节点时CANCELLED状态或者旧的tail节点waitStatus设置失败,则直接唤醒线程(做异常处理)if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
最后一条到达的线程在 finally 中执行释放锁的操作,唤醒CLH队列中的所有节点。
