一,源码

对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。countDown() 方法每次调用都会将 state 减 1,直到state 的值为 0;而 await 是一个阻塞方法,当 state 减为 0 的时候,await 方法才会返回。await 可以被多个线程调用,所有调用了await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满足(state == 0),将线程从队列中一个个唤醒过来。
1.内部类
private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;//CountDownLatch中的计数其实就是AQS的stateSync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {//如果state =0 返回1 否则返回 0return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {for (;;) {//获取最新的stateint c = getState();//如果state==0 返回falseif (c == 0)return false;int nextc = c-1;//如果cas成功,且c=1,返回trueif (compareAndSetState(c, nextc))return nextc == 0;}}}
2.构造函数
private final Sync sync;//构造函数public CountDownLatch(int count) {//边界值判断if (count < 0) throw new IllegalArgumentException("count < 0");//初始化Syncthis.sync = new Sync(count);}
3.await
使当前线程挂起,直到计数器减为0或者当前线程被中断。
public void await() throws InterruptedException {//执行aqs.acquireSharedInterruptibly()sync.acquireSharedInterruptibly(1);}
4.AQS.acquireSharedInterruptibly
countdownlatch 也用到了 AQS,在 CountDownLatch 内部写了一个 Sync 并且继承了AQS 这个抽象类重写了 AQS中的共享锁方法。首先看到下面这个代码,这块代码主要是 判 断 当 前 线 程 是 否 获 取 到 了 共 享 锁 ; ( 在CountDownLatch 中 , 使 用 的是 共 享 锁 机 制 , 因 为CountDownLatch 并不需要实现互斥的特性)。
public final void acquireSharedInterruptibly(long arg) throws InterruptedException {//如果当前线程被中断,抛出中断异常if (Thread.interrupted())throw new InterruptedException();//条件成立:说明此时state>0将线程入队,然后等待唤醒//条件不成立:说明此时state=0,说明此时阻塞已经放开,当前线程不会被阻塞if (tryAcquireShared(arg) < 0)//将当前线程加入到共享锁队列doAcquireSharedInterruptibly(arg);}
5.tryAcquireShared
判断state状态.
protected int tryAcquireShared(int acquires) {//如果state =0 返回1 否则返回 0return (getState() == 0) ? 1 : -1;}
6.AQS.doAcquireSharedInterruptibly
addWaiter 设置为 shared 模式。
tryAcquire 和 tryAcquireShared 的返回值不同,因此会多出一个判断过程。
在 判 断 前 驱 节 点 是 头 节 点 后 , 调 用 了setHeadAndPropagate 方法,而不是简单的更新一下头节点。
private void doAcquireSharedInterruptibly(long arg)throws InterruptedException {//将当前线程封装成节点入队,共享节点,使用的是state的高16位运算final Node node = addWaiter(Node.SHARED);boolean failed = true;try {//自旋for (;;) {//获取当前节点的前驱final Node p = node.predecessor();//如果前驱节点是头节点if (p == head) {//当前节点就可以尝试去抢锁long r = tryAcquireShared(arg);//此时说明抢到锁了if (r >= 0) {//修改头节点的值setHeadAndPropagate(node, r);//头节点出队p.next = null; // help GC//代表抢锁成功failed = false;return;}}//否则的话,线程在这里park,如果线程中断信号=true,就会抛出中断异常if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {//如果抢锁失败了,就走取消竞争锁的逻辑if (failed)cancelAcquire(node);}}
假如这个时候有 3 个线程调用了 await 方法,由于这个时候 state 的值还不为 0,所以这三个线程都会加入到 AQS队列中。并且三个线程都处于阻塞状态。
7.countDown
递减锁计数,如果锁计数为0,释放所有阻塞线程。
public void countDown() {sync.releaseShared(1);}
8.AQS.releaseShared
由于线程被 await 方法阻塞了,所以只有等到countdown 方法使得 state=0 的时候才会被唤醒。
只有当 state 减为 0 的时候,tryReleaseShared 才返回 true, 否则只是简单的 state = state - 1。
如果 state=0, 则调用 doReleaseShared唤醒处于 await 状态下的线程。
public final boolean releaseShared(int arg) {//执行子类重写的方法,state=0的时候,执行doReleaseShared//条件成立:说明当前调用latch.countDown()方法的线程,正好是state-1 == 0 的这个线程,需要做触发唤醒await状态的线程。if (tryReleaseShared(arg)) {//调用countDown()方法的线程,只有一个线程会进入到这个if块里面,执行下面的方法doReleaseShared();return true;}return false;}
9.tryReleaseShared
自旋释放锁,释放完了返回true,否则返回false。
protected boolean tryReleaseShared(int releases) {for (;;) {//获取最新的stateint c = getState();//如果state==0 返回falseif (c == 0)return false;int nextc = c-1;//如果cas成功,且c=1,返回trueif (compareAndSetState(c, nextc))return nextc == 0;}}
10.AQS.doReleaseShared
共享锁的释放和独占锁的释放有一定的差别
前面唤醒锁的逻辑和独占锁是一样,先判断头结点是不是SIGNAL 状态,如果是,则修改为 0,并且唤醒头结点的下一个节点。
PROPAGATE : 标识为 PROPAGATE 状态的节点,是共享锁模式下的节点状态,处于这个状态下的节点,会对线程的唤醒进行传播
private void doReleaseShared() {//自旋for (;;) {//获取头节点的引用Node h = head;//如果头节点不为空 && 头节点不等于尾结点//条件一成立:说明阻塞队列不为空//什么时候不成立?latch创建出来以后,没有任何线程调用过await方法之前,就有线程调用countDown操作,并且触发了唤醒阻塞节点的逻辑//条件二成立:说明当前队列除了头节点还有其他节点//什么时候不成立?//1.正常唤醒情况:依次获取共享锁,当前线程执行到这里的时候是tail节点//2.第一个调用await的线程与调用countDown的线程并发了if (h != null && h != tail) {int ws = h.waitStatus;//如果头结点的转态=-1if (ws == Node.SIGNAL) {//cas设置头节点的状态失败if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;//cas成功,唤醒头节点的下一个节点unparkSuccessor(h);}//cas失败走到这里,//执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}/*条件成立:1.说明刚刚唤醒的后继节点,还没将自己设置为头节点,没执行到呢....这个时候,当前线程直接跳出去结束了此时并不需要担心 唤醒逻辑 在这里断开 ,因为被唤醒的线程,早晚会执行到doReleaseShared方法2.head==nulllatch创建出来以后,没有任何线程调用过await方法之前,就有线程调用countDown操作,并且触发了唤醒阻塞节点的逻辑3.h==tailbreak条件不成立:条件成立1的相反情况,此时唤醒他的节点 执行 h == head 不成立,此时 原头节点不会跳出,会继续唤醒新的头节点的后继节点。*/if (h == head)break;}}
11.AQS.doAcquireSharedInterruptibly
一旦 ThreadA 被唤醒,代码又会继续回到doAcquireSharedInterruptibly 中来执行。如果当前 state满足=0 的条件,则会执行 setHeadAndPropagate 方法。
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);//创建一个共享模式的节点添加到队列中boolean failed = true;try {for (;;) {//被唤醒的线程进入下一次循环继续判断final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);//就判断尝试获取锁if (r >= 0) {//r>=0 表示获取到了执行权限,这个时候因为 state!=0,所以不会执行这段代码setHeadAndPropagate(node, r);p.next = null; // help GC 把当前节点移除 aqs 队列failed = false;return;}}//阻塞线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
12.setHeadAndPropagate
这个方法的主要作用是把被唤醒的节点,设置成 head 节点。 然后继续唤醒队列中的其他线程。由于现在队列中有 3 个线程处于阻塞状态,一旦 ThreadA被唤醒,并且设置为 head 之后,会继续唤醒后续的ThreadB。
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check below//将当前节点设置为头节点setHead(node);//1>0if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;//条件一:s==null 什么时候成立呢? 当前node节点已经是tail节点了,//条件二的前置条件:s!=null 要求s的模式是共享模式if (s == null || s.isShared())//继续向后唤醒doReleaseShared();}}
13.流程图
二,使用
countdownlatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完毕再执行。从命名可以解读到 countdown 是倒数的意思,类似于倒计时的概念。
countdownlatch 提供了两个方法,一个是 countDown,一个是 await, countdownlatch 初始化的时候需要传入一个整数,在这个整数倒数到 0 之前,调用了 await 方法的程序都必须要等待,然后通过 countDown 来倒数。
/*** @author 二十* @since 2021/9/6 2:00 下午*/public class DemoA {private static CountDownLatch c = new CountDownLatch(6);private static ThreadPoolExecutor executor = new ThreadPoolExecutor(6,6,1,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1),new MyDefaultFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args)throws Exception {for (int i = 0; i < 6; i++)executor.submit(()->{System.out.println(Thread.currentThread().getName() + "国被灭!");c.countDown();});c.await();if (Thread.currentThread().getName().equals("main")) System.out.println("main线程执行结束:" + Thread.currentThread().getName() );}private static class MyDefaultFactory implements ThreadFactory{private static Queue<String> queue = new LinkedList();static {for (int i = 1; i <= 6; i++) queue.add(Objects.requireNonNull(Message.foreach_CountryEnum(i)).message);}@Overridepublic Thread newThread(Runnable r) {return new Thread(r,"thread-"+queue.poll() +"-er_shi");}}enum Message {ONE(1, "齐"), TWO(2, "楚"), THREE(3, "燕"), FOUR(4, "赵"), FIVE(5, "魏"), SIX(6, "韩");private int code;private String message;Message(int code, String message) {this.code = code;this.message = message;}public int getCode() {return code;}public void setCode(int code) {this.code = code;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}public static Message foreach_CountryEnum(int index) {Message[] countryEnums = Message.values();for (Message countryEnum : countryEnums) {if (countryEnum.getCode() == index) {return countryEnum;}}return null;}}}
