注:文中代码的解释基本上都以注释的形式和代码写在一起

CountDownLatch是并发环境中常用的计数组件,也是基于AQS实现的。主要的方法有两个,countDown和await,实现了AQS模板方法的tryReleaseShared方法来完成countDown计数减的过程,实现了AQS模板方法的tryAcquireShared方法来实现await阻塞等待功能。

countDown方法

countDown方法源码如下,直接调用了内部类sync的releaseShared方法来实现,这里的Sync和ReentrantLock的内部类Sync一样,是继承了AQS的内部类,releaseShared方法正是AQS提供的共享模式的模板方法。

  1. public void countDown() {
  2. //直接调用了AQS的releaseShared
  3. sync.releaseShared(1);
  4. }

AQS的releaseShared方法源码如下

  1. public final boolean releaseShared(int arg) {
  2. //tryReleaseShared方法AQS留给子类自己实现
  3. //尝试将status减去arg,如果返回为true,执行doRelease方法,否则返回
  4. if (tryReleaseShared(arg)) {
  5. doReleaseShared();
  6. return true;
  7. }
  8. return false;
  9. }

releaseShared方法中调用了CountDownLatch中实现的tryReleaseShared方法,源码如下

  1. protected boolean tryReleaseShared(int releases) {
  2. //通过源码我们发现传入的参数releases并没有什么用,每次计数固定减一
  3. // 无限循环直到值减一成功或者status变成0
  4. for (;;) {
  5. //获取status的值,CountDownlatch中status的值代表要等待的总计数
  6. int c = getState();
  7. //如果已经是0了说明已经不能再减计数了,返回false
  8. if (c == 0)
  9. return false;
  10. int nextc = c-1;
  11. //CAS的方式将status减一
  12. if (compareAndSetState(c, nextc))
  13. 如果当前减完之后,status0,也就意味着计数结束了,返回true
  14. return nextc == 0;
  15. }
  16. }

tryReleaseShared方法返回为true,也就是计数结束时,会接着执行doReleaseShared方法。doReleaseShared方法在CountDownLatch中没有重写,直接调用的是AQS的doReleaseShared方法,源码如下,其中unparkSuccessor源码解析见另一篇博客《AQS源码解析》

  1. private void doReleaseShared() {
  2. //无限循环
  3. for (;;) {
  4. //获取头节点
  5. Node h = head;
  6. if (h != null && h != tail) {
  7. int ws = h.waitStatus;
  8. //如果头节点设置的是要唤醒下一个节点的等待状态
  9. if (ws == Node.SIGNAL) {
  10. //将节点的waitStatus设置成0
  11. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  12. continue; // loop to recheck cases
  13. //如果设置成功,唤醒后面的等待节点
  14. unparkSuccessor(h);
  15. }
  16. //ws==0说明第一步设置成功或者原先就是0
  17. //将其状态设置为PROPAGATE
  18. //失败(PROPAGATE状态表示同步状态将会无条件传播,意思就是节点可运行)
  19. else if (ws == 0 &&
  20. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  21. continue; // loop on failed CAS
  22. }
  23. //如果h还是头节点,就结束循环
  24. if (h == head) // loop if head changed
  25. break;
  26. }
  27. }

await方法

await方法直接调用了内部类Sync的acquireSharedInterruptibly方法,阻塞线程直到count为0,当前线程才能拿到锁(或者抛出异常也有可能结束阻塞)。源码如下:

  1. public void await() throws InterruptedException {
  2. //直接调用了内部类Sync的方法(其实是AQS的方法)
  3. sync.acquireSharedInterruptibly(1);
  4. }

acquireSharedInterruptibly方法,如果线程被中断过就抛出异常结束阻塞,不然就判断计数的值,为0就返回,等于当前阻塞的线程获得了锁,如果计数不为0,进入doAcquireSharedInterruptibly方法进行排队等待。源码如下:

  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. //如果线程被中断过,抛出异常,线程不再等待
  4. if (Thread.interrupted())
  5. throw new InterruptedException();
  6. //tryAcquireShared尝试获取共享状态
  7. //tryAcquireShared源码见下方,实际就是获取计数
  8. //计数不为0则执行doAcquireSharedInterruptibly方法
  9. //计数为0则返回值大于0,方法直接返回,线程阻塞结束。等于线程获得了锁
  10. if (tryAcquireShared(arg) < 0)
  11. doAcquireSharedInterruptibly(arg);
  12. }
  13. protected int tryAcquireShared(int acquires) {
  14. return (getState() == 0) ? 1 : -1;
  15. }

doAcquireSharedInterruptibly方法只有在线程阻塞时会被调用,也就是计数不为0时被调用。方法将当前线程构造为一个共享模式的等待节点加入等待队列中,然后开启无限自循环,直到计数等于0获取到锁,或者抛出异常为止。源码如下:

  1. private void doAcquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. //新建一个共享模式节点加入等待队列
  4. final Node node = addWaiter(Node.SHARED);
  5. boolean failed = true;
  6. try {
  7. for (;;) {
  8. //获取新建节点的前一个节点
  9. final Node p = node.predecessor();
  10. //如果前一个就是头节点
  11. if (p == head) {
  12. //说明当前线程是第一个等待的,尝试获取锁
  13. //也就是获取计数
  14. int r = tryAcquireShared(arg);
  15. //r不小于0说明计数已经是0了,等于当前线程已经获得了锁
  16. if (r >= 0) {
  17. //将当前线程的节点设置成头节点
  18. setHeadAndPropagate(node, r);
  19. //原先的头节点p从队列中解除,便于垃圾回收
  20. p.next = null; // help GC
  21. failed = false;
  22. return;
  23. }
  24. }
  25. //shouldParkAfterFailedAcquire检查如果获取锁失败当前节点是否需要挂起
  26. //只有当前一个节点的waitStatus是SIGNAL也就是说前一个节点
  27. //获得锁以后会把自己唤醒,当前节点才能放心挂起
  28. //parkAndCheckInterrupt判断节点是否被中断过
  29. //这里意思是如果当前节点是在被挂起状态而且被中断过就抛出异常
  30. if (shouldParkAfterFailedAcquire(p, node) &&
  31. parkAndCheckInterrupt())
  32. throw new InterruptedException();
  33. }
  34. } finally {
  35. //如果failed为true说明线程被中断,没有获得锁
  36. //所以取消获取锁的动作
  37. if (failed)
  38. cancelAcquire(node);
  39. }
  40. }

总结来说就是await方法会让当前线程阻塞,进入方法后先判断一次计数是否为0,如果是0则直接返回,线程获得锁,阻塞结束。如果不为0则进入doAcquireSharedInterruptibly方法,将当前线程构造成了一个等待节点,开启无限循环,线程被阻塞。无限循环直到当前线程的节点排队排到了头节点的后面,就可以尝试获得锁了,如果成功了就可以返回,阻塞结束。在排队的过程中如果线程被中断那么就抛出异常。简而言之阻塞是由无限的for循环造成的,所以结束循环就是线程结束阻塞的关键了。

提问:await方法支持多个线程一起等待吗
回答:支持。从源码角度看,await方法并没有做任何的同步控制,多个线程等待和一个线程等待,结果没有什么不同,所有等待的线程都会阻塞到status为0,阻塞过程中这些线程就乖乖的在队列里等待。

搬运自我的简书 https://www.jianshu.com/p/2effb6b3e6c6