一、CountDownLatch的使用

之前看Java并发编程艺术这本书的时候,在线程间的通信方式章节看到,CountDownLatch可以用于线程间的通信。常见场景如下,一群短跑运动员参赛时,在大家都做好起跑准备之后,只要发令枪一响,各个运动员都尝试以最快速度出发。另外,从这个名称计数器闭锁来看,可以用作倒时计数器的用途。举个例子,

  1. private void await() {
  2. CountDownLatch cdl = new CountDownLatch(5);
  3. Runnable task = () -> {
  4. try {
  5. LOGGER.info("be ready, waiting for suck.");
  6. cdl.await();
  7. LOGGER.info("try the best to run.");
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. };
  12. List<Thread> group = new ArrayList<>(10);
  13. int count = 5;
  14. for(int i = 0; i < count; i +=1) {
  15. group.add(new Thread(task, "learning-cdl-" + i));
  16. }
  17. group.parallelStream().forEach(t -> t.start());
  18. //5
  19. cdl.countDown();
  20. //4
  21. cdl.countDown();
  22. //3
  23. cdl.countDown();
  24. //2
  25. cdl.countDown();
  26. try {
  27. TimeUnit.SECONDS.sleep(1);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. //1
  32. cdl.countDown();
  33. }

最主要的有两个方法,一个是await,另外一个是countDown。

二、独占锁 vs 共享锁

后来研究CountDownLatch底层实现源码时才知道,它是基于AQS的共享锁来实现的,之前仅仅浅浅的研究一下独占锁的实现方式。于是抽空借助CountDownLatch来研究一下共享锁的实现过程。
仅仅从名字上,就能发现独占锁与共享锁的最大区别,独占锁表示锁只能由一个线程持有,而共享锁可以由多个线程持有。独占锁,是互斥锁,当某个线程持有锁时,其他线程都需要在一个队列中排队等待,只有持有锁的线程使用完锁资源后释放掉锁后,其他线程才能争抢。而如果某个线程持有共享锁,如果其他线程过来争抢时,极大可能会成功抢占共享锁。

独占锁 共享锁
boolean tryAcquire(int arg) int tryAcquireShared(int arg)
boolean tryAcquireNanos(int arg, long nanosTimeout) boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
void acquire(int arg) void acquireShared(int arg)
void acquireInterruptibly(int arg) void acquireSharedInterruptibly(int arg)
boolean tryRelease(int arg) boolean tryReleaseShared(int arg)
boolean release(int arg) boolean releaseShared(int arg)
void doAcquireInterruptibly(int arg) private void doAcquireSharedInterruptibly(int arg)
boolean doAcquireNanos(int arg, long nanosTimeout) boolean doAcquireSharedNanos(int arg, long nanosTimeout)
void doReleaseShared()

可以看出,除了最后一个属于共享锁的doReleaseShared以外,其他的方法,独占锁和共享锁都是一一对应的。其实doReleaseSharedy与unparkSuccessor是对应的,不过前者包含了后者,还有一些其他的处理逻辑。
另外,持有独占锁的线程在释放锁后,会唤醒后继节点对应的线程,共享锁的释放就不是这样了。在共享锁模式下,如果一个线程获取到共享锁,那么就可以唤醒后继节点了,这样在线程不释放共享锁时,其他线程也可以获取共享锁。概括的说,在共享锁模式下,线程在获取锁和释放锁时,都会唤醒后继节点。

三、共享锁的获取

CountDownLatch的await方法就是获取共享锁的逻辑。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

  1. private static final class Sync extends AbstractQueuedSynchronizer {
  2. private static final long serialVersionUID = 4982264981922014374L;
  3. Sync(int count) {
  4. setState(count);
  5. }
  6. int getCount() {
  7. return getState();
  8. }
  9. //尝试获取共享锁,如果state等于0,就获取锁成功
  10. protected int tryAcquireShared(int acquires) {
  11. return (getState() == 0) ? 1 : -1;
  12. }
  13. protected boolean tryReleaseShared(int releases) {
  14. // Decrement count; signal when transition to zero
  15. for (;;) {
  16. int c = getState();
  17. if (c == 0)
  18. return false;
  19. int nextc = c-1;
  20. if (compareAndSetState(c, nextc))
  21. return nextc == 0;
  22. }
  23. }
  24. }

CountDownLatch内部定义了一个继承AQS的Sync组件,这个组件重写了获取锁和释放锁的逻辑。下面一层一层分析代码。

  1. sync.acquireSharedInterruptibly(1);
  2. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. //对于CountDownLatch来说,如果state不等于0,就表示获取锁失败,获取锁失败后,线程需要入队列;
  6. //等于0,就成功获取共享锁。
  7. if (tryAcquireShared(arg) < 0){
  8. doAcquireSharedInterruptibly(arg);
  9. }
  10. }

说明:
1、如果state大于0,表示尝试获取共享锁失败,线程需要进入队列中挂起;创建Latch时,默认初始化state=4,也就是tryAcquireShared(1)会返回-1的。
2、如果state等于0时,线程再去调用cdl.await会直接返回的,表示线程成功获取共享锁。

响应线程中断的获取共享锁代码如下:

  1. doAcquireSharedInterruptibly(1);
  2. private void doAcquireSharedInterruptibly(int arg)
  3. throws InterruptedException {
  4. //将当前线程包装成Node,如果队列没有元素,则设置头节点(头节点为哑节点),再添加到队列的尾部
  5. final Node node = addWaiter(Node.SHARED);
  6. boolean failed = true;
  7. try {
  8. for (;;) {
  9. final Node p = node.predecessor();
  10. if (p == head) {
  11. //对于CountDownLatch来说,只要计数没有减少至0,那么r == -1的,获取共享锁失败。
  12. //前面抢占锁失败,如果当前线程对应头节点,那么这里再去尝试获取锁
  13. int r = tryAcquireShared(arg);
  14. if (r >= 0) {
  15. setHeadAndPropagate(node, r);
  16. p.next = null; // help GC
  17. failed = false;
  18. return;
  19. }
  20. }
  21. //获取锁失败后,需要先将前一节点设置为SIGNAL(-1),并挂起当前线程
  22. if (shouldParkAfterFailedAcquire(p, node) &&
  23. parkAndCheckInterrupt()){
  24. throw new InterruptedException();
  25. }
  26. }
  27. } finally {
  28. if (failed)
  29. cancelAcquire(node);
  30. }
  31. }

说明:
1、其实这段doAcquireSharedInterruptibly与doAcquireInterruptilby结构没有什么不同的,主要有2处处理不同。可以看出,同独占锁的获取方式类似,如果抢占锁失败,线程是要入队列的。这里的获取共享锁失败的线程,也通过addWaiter添加到队列尾部。

addWaiter(Node.EXCLUSIVE) addWaiter(Node.SHARED)
if (p == head && tryAcquire(arg)) {
setHead(node);

return;
}
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);

return;
}
}

我们回顾一下Node的结构

  1. static final class Node {
  2. static final Node SHARED = new Node();
  3. static final Node EXCLUSIVE = null;
  4. volatile int waitStatus;
  5. volatile Node prev;
  6. volatile Node next;
  7. volatile Thread thread;
  8. Node nextWaiter;
  9. // Used by addWaiter
  10. Node(Thread thread, Node mode) {
  11. this.nextWaiter = mode;
  12. this.thread = thread;
  13. }
  14. }

大家不要认为这里nextWaiter可以辅助形成一个单项链表,它的赋值是一个固定的对象,所以也仅仅是起到一个标识的作用。
2、只有线程将CountDownLatch的计数减少到0,这样tryAcquireShared才返回1(即成功获取锁),才能进行setHeadAndPropagate,否则直接进行shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()。
3、对于独占锁来说,setHead仅仅是将获取到锁的线程出队列(可以这么理解,将节点的thread设置为null,其实是保存在AQS.exclusiveOwnerThread,prev设置为null)。对于共享锁来说,setHeadAndPropagate不仅调用了setHead,还在一定条件下,调用了doReleaseShared,这个doReleaseShared的逻辑是释放共享锁,并唤醒后继节点。

如果某一线程成功获取共享锁之后,需要做什么事情呢?接着进入

  1. setHeadAndPropagate(node, 1)
  2. private void setHeadAndPropagate(Node node, int propagate) {
  3. // Record old head for check below
  4. Node h = head;
  5. setHead(node);
  6. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  7. (h = head) == null || h.waitStatus < 0) {
  8. Node s = node.next;
  9. if (s == null || s.isShared())
  10. doReleaseShared();
  11. }
  12. }

说明:
1、既然调用了setHeadAndPropagate,就说明CountDownLatch的计数器值减少到0了,也就是说等待队列中线程都可以尝试获取共享锁,所以setHeadAndPropagate的作用应该是释放共享锁,并唤醒头节点的后继线程。
至于为什么会是这样呢?在共享锁模式下,锁可以被多个线程持有,既然当前线程已经成功获取锁,那么就可以直接通知后继节点尝试获取共享锁,而没有必要等到共享锁释放以后,再去通知后继节点。
2、doReleaseShared是唤醒队列中的后继节操作,没有释放共享锁,因为state已经是0,不需要,后面会分析。

四、共享锁的释放

public void countDown() {
sync.releaseShared(1);
}

  1. releaseShared(1)
  2. public final boolean releaseShared(int arg) {
  3. if (tryReleaseShared(arg)) {
  4. doReleaseShared();
  5. return true;
  6. }
  7. return false;
  8. }

共享锁的releaseShared方法对应独占锁的release方法。

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

在独占锁模式下,head节点就是对应成功释放锁的线程包装而成的节点,所以它发现自己的waitStatus不等于0(也就是Node.SIGNAL)时,会调用unparkSuccessor唤醒后继节点。

  1. tryReleaseShared(1)
  2. protected boolean tryReleaseShared(int releases) {
  3. // Decrement count; signal when transition to zero
  4. for (;;) {
  5. int c = getState();
  6. if (c == 0)
  7. return false;
  8. int nextc = c-1;
  9. if (compareAndSetState(c, nextc))
  10. return nextc == 0;
  11. }
  12. }

说明:
1、共享锁的释放逻辑也很简单明了,通过CAS操作修改state的值,做减一操作,每个成功获取到共享锁的线程都需要通过tryReleaseShared释放锁,尝试释放共享锁的成功与否,取决于state的值是否为0。
2、在countDown调用以后,若state没有减少到0,表示尝试释放共享锁失败,也就不需要调用doReleaseShared来唤醒队列中后继节点对应的线程。

如果成功释放共享锁,也就是state现在是0,那么需要唤醒队列中的挂起线程。继续看,

  1. private void doReleaseShared() {
  2. for (;;) {
  3. Node h = head;
  4. if (h != null && h != tail) {
  5. int ws = h.waitStatus;
  6. if (ws == Node.SIGNAL) {
  7. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
  8. // loop to recheck cases
  9. continue;
  10. }
  11. unparkSuccessor(h);
  12. }
  13. else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
  14. // loop on failed CAS
  15. continue;
  16. }
  17. }
  18. if (h == head){
  19. // loop if head changed
  20. break;
  21. }
  22. }
  23. }

需要理清楚4个问题,分别是:
a 该方法都在什么地方调用过?
有两处,一处是在成功获取到共享锁后(也就是tryAcquireShared返回值大于0),满足一定条件时,在setHeadAndPropagate中有调用。另外一处是,在成功释放共享锁后调用。

b 调用该方法的线程是谁?
在共享锁场景中,可以有多个线程持有共享锁,这些线程都可以调用doReleaseShared来释放锁。而这些线程想要成功获取到共享锁(即tryAcquireShared返回值大于0),那么这些线程要么曾经成为过头节点,或者就是现在的头节点。因此,如果是在releaseShared中调用的doReleaseShared,那么当前线程有可能不是头节点所对应的线程了,因为头节点可能会易主好几次了。

c 调用该方法的目的是什么?
无论是在setHeadAndPropagate,还是releaseShared中调用,doReleaseShared的作用都是一样的,那就是唤醒后继节点,这一点跟独占锁很像,但二者有着一个重要的差别。线程A成功获取共享锁后,会唤醒它的后继节点(对应线程B,线程B的后继节点对应线程C),如果线程A还没有执行完,但是线程B已经成为头节点,那么循环会继续进行,接着唤醒线程C。也就存在一种可能,一个线程就可以将等待队列中的某几个节点对应的线程都唤醒,而不是像独占锁(公平锁)的释放那样,一次只能唤醒队列的头节点对应的那个线程。

d 退出该方法的条件是什么?

在自旋时,如果head没有易主,那么跳出循环。

再接着仔细分析

  1. for (;;) {
  2. Node h = head;
  3. if (h != null && h != tail) {
  4. int ws = h.waitStatus;
  5. //第一个
  6. if (ws == Node.SIGNAL) {
  7. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
  8. // loop to recheck cases
  9. continue;
  10. }
  11. unparkSuccessor(h);
  12. }
  13. //第二个
  14. else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
  15. // loop on failed CAS
  16. continue;
  17. }
  18. }
  19. if (h == head){
  20. // loop if head changed
  21. break;
  22. }
  23. }

1、第一个if分支,头节点waitStatus等于Node.SIGNAL时,我们都知道,在独占锁场景下,是需要唤醒后继节点对应的线程的。这里也是一样。
2、另外一个else if分支,头节点的waitStatus等于0说明了头节点的next节点刚刚进入队列,在挂起之前将其pre节点(也就是头节点)的状态ws设置为SIGNAL的操作还没来的及执行。而后面的compareAndSetWaitStatus设置失败,表明这个时候通过shouldParkAfterFailedAcquire设置ws为SIGNAL恰恰成功,这个时候就不给当前线程break的机会了,而是接着进入下一轮循环,下一轮循环就进行1中的操作,一处条件承接了waitStatus的两种变化,细微处可见精彩,这种场景也考虑到了。
3、为什么要将节点的waitStatus设置成Node.PROPAGATE?不要第二个分支,有没有问题呢?

参考:
1 逐行分析AQS源码,共享锁的获取与释放
https://segmentfault.com/a/1190000016447307
2 AQS同步状态的获取与释放
http://cmsblogs.com/?p=2197#i-7