1、简介
看下CountDownLatch类图
看到CountDownLatch类有一个静态内部类Sync ,他是继承了AQS类,使用AQS的共享模式,那么它就可以调用AQS的相关方法。
对于CountdownLatch,简而言之就是,该类能实现让一个线程等待其他线程做完一些操作之后,再继续执行;也可以让一组线程去等待某一个线程执行完毕,然后再让一组线程继续执行。
2、 构造函数
CountDownLatch只提供了一个构造函数,传递一个int类型的初始值,该值可以称之为计数器,或者倒计时数,通过调用CountDownLatch.countDown()方法,能使之计数器或者倒计时数 减1,等到倒计时数==0时候,代表资源已经全部释放,需要唤醒阻塞队列里等待的线程
public CountDownLatch(int count) {//边界值判断if (count < 0) throw new IllegalArgumentException("count < 0");//实例化Sync类this.sync = new Sync(count);}
Sync(int count) {//调用AQS的setState方法setState(count);}
这里看到是最终给AQS的state值赋值,我们在ReentrantLock中知道,state值对应的是锁资源
protected final void setState(int newState) {state = newState;}
3、方法
- await() 线程调用该方法,如果锁资源未全部释放,会将线程以AQS的共享模式封装为node节点,添加到阻塞队列里,然后自旋被阻塞掉
countDown() 线程调用该方法,会让倒计时数减1,等待减一后 state ==0的时候,代表锁资源完全释放,会先唤醒阻塞队列的head.next节点,然后让head.next节点去唤醒后继阻塞的节点
4、await
是当前线程添加到阻塞队列挂起,直到锁资源完全释放被唤醒或者 该线程阻塞期间被中断唤醒
public void await() throws InterruptedException {//调用sync方法sync.acquireSharedInterruptibly(1);}
5、AQS.acquireSharedInterruptibly
先判断是否被中断过,如果有直接抛出中断异常
调用 tryAcquireShared(arg) 方法判断锁资源state是否等于0,没有说明锁资源未被释放,将线程入阻塞队列阻塞,等待唤醒public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//条件成立: 说明调用await的线程已经是 中断状态了 直接抛出异常if (Thread.interrupted())throw new InterruptedException();//条件成立 AQS.state >0 此时将线程入队,然后阻塞等待被唤醒//条件不成立 AQS.state == 0 对应业务CountDownLatch已经countdown完了if (tryAcquireShared(arg) < 0)//将当前线程添加到共享锁阻塞队列doAcquireSharedInterruptibly(arg);}
6、CountDownLatch.tryAcquireShared
判断锁资源是否==0 ? 是返回1 ,不是返回 -1
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
7.AQS.doAcquireSharedInterruptibly
将线程添加到阻塞队列里挂起逻辑
线程封装为node,设置为共享模式
- 判断是否是head.next节点,是的话尝试获取锁,成功之后调用 setHeadAndPropagate 方法,cas修改node状态为0,并唤醒阻塞队列的后继节点
不是head.next节点,则给自己找一个好爸爸,节点修改 -1 (signal),然后阻塞挂起线程,挂起过程中被中断唤醒,修改node节点为取消状态,走移除node节点逻辑
//AQS的doAcquireSharedInterruptibly方法private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//将调用await方法的线程封装为node 添加到AQS的阻塞队列中final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {//获取当前node的前驱节点final Node p = node.predecessor();//条件成立 说明当前node节点是head.next节点,有权利获取 共享锁if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//给当前线程找到好爸爸,将好爸爸的状态设置为singal -1 ,返回true//parkAndCheckInterrupt 挂起node对应的线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {//阻塞过程中被中断唤醒之后,会抛出中断异常,然后将node节点设置为取消状态,走清除节点逻辑if (failed)cancelAcquire(node);}}
假设初始化了CountDownLatch(1),state==1,有三个线程(线程A,线程B,线程C)调用了await方法,那么这三个线程都会被添加到AQS的阻塞队列,然后被阻塞。那么此时阻塞队列的情况如下图所示:
8、countDown
递减倒计时数,待倒计时数==0时候,释放所有阻塞的线程
public void countDown() {//调用AQS的方法sync.releaseShared(1);}
9、AQS.releaseShared
调用tryReleaseShared 去让state 减1,state减为0时候返回 true,否则正常让state = state-1
- state锁资源释放完之后,需要唤醒那些调用await方法而被阻塞挂起的线程
//AQS的releaseSharedpublic final boolean releaseShared(int arg) {//条件成立 说明调用latch.countdown()方法的线程是让 state-1 ==0的线程,需要做 唤醒await状态的线程逻辑if (tryReleaseShared(arg)) {//调用countdown方法的线程 只有一个会进入到这个里面,去做唤醒 阻塞状态的线程逻辑doReleaseShared();return true;}return false;}
10、CountDownLatch.tryReleaseShared
每调用一次该方法让AQS的state 减一 逻辑。
自旋让state减一,待state 减为0 返回true,否则返回false//countdownlatch内部类sync的tryReleaseShared方法//更新AQS的state值,每调用一次,state减1,当state==0时 返回trueprotected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();//条件成立 说明前面已经有线程 触发 唤醒操作了, 返回falseif (c == 0)return false;//执行到这里,state>0int nextc = c-1;//cas 修改stateif (compareAndSetState(c, nextc))//nextc == 0 说明当前调用countdown的线程是 需要触发 唤醒操作的线程return nextc == 0;}}
11、AQS.doReleaseShared
state锁释放完之后,唤醒 调用await方法而被阻塞的线程
共享锁唤醒逻辑 跟独占锁唤醒一样,都需要先将头节点状态 cas方式从 signal 修改为0,然后使用unpark方式 唤醒头节点的下一个节点
//AQS的doReleaseShared方法//有哪几种情况会调用当前方法?//1.latch.countdown让AQS.state==0 然后调用doReleaseShared方法去唤醒head.next对应的线程//2.被唤醒的线程在doAcquireSharedInterruptibly方法中调用setHeadAndPropagate方法,然后会调用doReleaseShared该方法private void doReleaseShared() {for (;;) {//获取当前AQS的head节点Node h = head;//条件一 h != null 成立 说明阻塞队列不为空//条件二 h != tail 成立说明当前阻塞队列不只有head一个节点//h ==tail 表示head和tail指向同一个node节点,什么时候会出现?//1.正常唤醒情况,唤醒最后一个node节点时候,head是等于tail的//2.第一个调用await的线程在准备addWaiter入队时给head节点擦屁股后还没有把自己放进阻塞队列时候 这时候与countdown方法发生并发了if (h != null && h != tail) {//执行到这里 说明当前head一定有后继节点int ws = h.waitStatus;if (ws == Node.SIGNAL) {//将当前node状态改为0//为什么用cas 多个线程唤醒head.next节点时候, 可能会失败//案例:t3线程在if (h == head) 返回false时 t3不会退出循环,会继续自旋 参与到唤醒下一个head.next逻辑//t3此时执行cas 成功..t4(head节点线程)在t3修改成功之前,也进入到这里代码块,t4会compareAndSetWaitStatus 修改失败,因为t3改过了if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;//唤醒head的后继节点unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}//条件成立//1. 说明刚被unaprk唤醒的后继节点还没有执行到setHeadAndPropagate方法里面的 设置当前后继node为head节点逻辑//2. h==null//3. h==tail head==tail指向一个node对象//条件不成立//被唤醒的节点 很积极 直接将自己设置为head节点, 此时 唤醒它的节点(前驱节点) 执行h==head不成立//此时head节点的前驱节点 不会跳出doReleaseShared方法,会继续唤醒 新head节点的后继节点..if (h == head) // loop if head changedbreak;}}
12、AQS.doAcquireSharedInterruptibly
线程A被unpark唤醒之后,线程A又会回到 doAcquireSharedInterruptibly 方法里面,会自旋判断当前节点是否是head.next节点,如果是并且满足state==0的条件,会调用setHeadAndPropagate方法
//AQS的doAcquireSharedInterruptibly方法private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//将调用await方法的线程封装为node 添加到AQS的阻塞队列中final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {//获取当前node的前驱节点final Node p = node.predecessor();//条件成立 说明当前node节点是head.next节点,有权利获取 共享锁if (p == head) {//尝试去获取锁,共享锁获取锁是判断state==0是否成立int r = tryAcquireShared(arg);//条件成立,说明锁资源已释放完毕,if (r >= 0) {//让当前节点成为头节点,并让当前节点继续向后唤醒下一个节点setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//给当前线程找到好爸爸,将好爸爸的状态设置为singal -1 ,返回true//parkAndCheckInterrupt 挂起node对应的线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {//阻塞过程中被中断唤醒之后,会抛出中断异常,然后将node节点设置为取消状态,走清除节点逻辑if (failed)cancelAcquire(node);}}
13、AQS.setHeadAndPropagate
该方法逻辑是将线程A设置为头节点,并唤醒线程A的下一个节点;一旦线程A被唤醒,并且被设置为头节点之后,就会继续唤醒线程A的后一个节点线程B。
//AQS的方法 设置当前node为 head节点,并向后传播(依次唤醒!)private void setHeadAndPropagate(Node node, int propagate) {Node h = head;//将当前节点设置为head节点setHead(node);//propagate 是1 一定成立if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {//获取当前节点的后继节点Node s = node.next;//条件一:s == null 成立 当前node节点已经是tail节点了,条件一成立 doReleaseShared会处理这种情况//条件二:前置条件 s!=null 那么要求s节点必须是 共享模式if (s == null || s.isShared())//基本上所有情况都会执行到这里doReleaseShared();}}
14、使用
public static void main(String[] args) {CountDownLatch latch = new CountDownLatch(5);ExecutorService executorService = Executors.newFixedThreadPool(5);for (int i = 0; i < 5; i++) {executorService.submit(new Worker(String.format("线程%s", i), latch));}try {long startTime = System.currentTimeMillis();System.out.println("主线程开始等待其他组件执行....");latch.await();long endTime = System.currentTimeMillis();System.out.println("主线程开始执行后续逻辑....,耗时" + (endTime - startTime));} catch (InterruptedException e) {e.printStackTrace();}}static class Worker implements Runnable {private String threadName;private CountDownLatch latch;public Worker(String threadName, CountDownLatch latch) {this.threadName = threadName;this.latch = latch;}@Overridepublic void run() {try {System.out.println(this.threadName + "开始执行逻辑...");Thread.sleep(5000);System.out.println(this.threadName + "执行完毕逻辑...");} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown();}}
