一、CountDownLatch的使用
之前看Java并发编程艺术这本书的时候,在线程间的通信方式章节看到,CountDownLatch可以用于线程间的通信。常见场景如下,一群短跑运动员参赛时,在大家都做好起跑准备之后,只要发令枪一响,各个运动员都尝试以最快速度出发。另外,从这个名称计数器闭锁来看,可以用作倒时计数器的用途。举个例子,
private void await() {CountDownLatch cdl = new CountDownLatch(5);Runnable task = () -> {try {LOGGER.info("be ready, waiting for suck.");cdl.await();LOGGER.info("try the best to run.");} catch (InterruptedException e) {e.printStackTrace();}};List<Thread> group = new ArrayList<>(10);int count = 5;for(int i = 0; i < count; i +=1) {group.add(new Thread(task, "learning-cdl-" + i));}group.parallelStream().forEach(t -> t.start());//5cdl.countDown();//4cdl.countDown();//3cdl.countDown();//2cdl.countDown();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}//1cdl.countDown();}
最主要的有两个方法,一个是await,另外一个是countDown。
二、独占锁 vs 共享锁
后来研究CountDownLatch底层实现源码时才知道,它是基于AQS的共享锁来实现的,之前仅仅浅浅的研究一下独占锁的实现方式。于是抽空借助CountDownLatch来研究一下共享锁的实现过程。
仅仅从名字上,就能发现独占锁与共享锁的最大区别,独占锁表示锁只能由一个线程持有,而共享锁可以由多个线程持有。独占锁,是互斥锁,当某个线程持有锁时,其他线程都需要在一个队列中排队等待,只有持有锁的线程使用完锁资源后释放掉锁后,其他线程才能争抢。而如果某个线程持有共享锁,如果其他线程过来争抢时,极大可能会成功抢占共享锁。
| 独占锁 | 共享锁 |
|---|---|
| boolean tryAcquire(int arg) | int tryAcquireShared(int arg) |
| boolean tryAcquireNanos(int arg, long nanosTimeout) | boolean tryAcquireSharedNanos(int arg, long nanosTimeout) |
| void acquire(int arg) | void acquireShared(int arg) |
| void acquireInterruptibly(int arg) | void acquireSharedInterruptibly(int arg) |
| boolean tryRelease(int arg) | boolean tryReleaseShared(int arg) |
| boolean release(int arg) | boolean releaseShared(int arg) |
| void doAcquireInterruptibly(int arg) | private void doAcquireSharedInterruptibly(int arg) |
| boolean doAcquireNanos(int arg, long nanosTimeout) | boolean doAcquireSharedNanos(int arg, long nanosTimeout) |
| — | void doReleaseShared() |
可以看出,除了最后一个属于共享锁的doReleaseShared以外,其他的方法,独占锁和共享锁都是一一对应的。其实doReleaseSharedy与unparkSuccessor是对应的,不过前者包含了后者,还有一些其他的处理逻辑。
另外,持有独占锁的线程在释放锁后,会唤醒后继节点对应的线程,共享锁的释放就不是这样了。在共享锁模式下,如果一个线程获取到共享锁,那么就可以唤醒后继节点了,这样在线程不释放共享锁时,其他线程也可以获取共享锁。概括的说,在共享锁模式下,线程在获取锁和释放锁时,都会唤醒后继节点。
三、共享锁的获取
CountDownLatch的await方法就是获取共享锁的逻辑。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}//尝试获取共享锁,如果state等于0,就获取锁成功protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}
CountDownLatch内部定义了一个继承AQS的Sync组件,这个组件重写了获取锁和释放锁的逻辑。下面一层一层分析代码。
sync.acquireSharedInterruptibly(1);public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//对于CountDownLatch来说,如果state不等于0,就表示获取锁失败,获取锁失败后,线程需要入队列;//等于0,就成功获取共享锁。if (tryAcquireShared(arg) < 0){doAcquireSharedInterruptibly(arg);}}
说明:
1、如果state大于0,表示尝试获取共享锁失败,线程需要进入队列中挂起;创建Latch时,默认初始化state=4,也就是tryAcquireShared(1)会返回-1的。
2、如果state等于0时,线程再去调用cdl.await会直接返回的,表示线程成功获取共享锁。
响应线程中断的获取共享锁代码如下:
doAcquireSharedInterruptibly(1);private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//将当前线程包装成Node,如果队列没有元素,则设置头节点(头节点为哑节点),再添加到队列的尾部final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {//对于CountDownLatch来说,只要计数没有减少至0,那么r == -1的,获取共享锁失败。//前面抢占锁失败,如果当前线程对应头节点,那么这里再去尝试获取锁int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//获取锁失败后,需要先将前一节点设置为SIGNAL(-1),并挂起当前线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()){throw new InterruptedException();}}} finally {if (failed)cancelAcquire(node);}}
说明:
1、其实这段doAcquireSharedInterruptibly与doAcquireInterruptilby结构没有什么不同的,主要有2处处理不同。可以看出,同独占锁的获取方式类似,如果抢占锁失败,线程是要入队列的。这里的获取共享锁失败的线程,也通过addWaiter添加到队列尾部。
| addWaiter(Node.EXCLUSIVE) | addWaiter(Node.SHARED) |
|---|---|
| if (p == head && tryAcquire(arg)) { setHead(node); … return; } |
if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); … return; } } |
我们回顾一下Node的结构
static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;volatile int waitStatus;volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter;// Used by addWaiterNode(Thread thread, Node mode) {this.nextWaiter = mode;this.thread = thread;}}
大家不要认为这里nextWaiter可以辅助形成一个单项链表,它的赋值是一个固定的对象,所以也仅仅是起到一个标识的作用。
2、只有线程将CountDownLatch的计数减少到0,这样tryAcquireShared才返回1(即成功获取锁),才能进行setHeadAndPropagate,否则直接进行shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()。
3、对于独占锁来说,setHead仅仅是将获取到锁的线程出队列(可以这么理解,将节点的thread设置为null,其实是保存在AQS.exclusiveOwnerThread,prev设置为null)。对于共享锁来说,setHeadAndPropagate不仅调用了setHead,还在一定条件下,调用了doReleaseShared,这个doReleaseShared的逻辑是释放共享锁,并唤醒后继节点。
如果某一线程成功获取共享锁之后,需要做什么事情呢?接着进入
setHeadAndPropagate(node, 1)private void setHeadAndPropagate(Node node, int propagate) {// Record old head for check belowNode h = head;setHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}
说明:
1、既然调用了setHeadAndPropagate,就说明CountDownLatch的计数器值减少到0了,也就是说等待队列中线程都可以尝试获取共享锁,所以setHeadAndPropagate的作用应该是释放共享锁,并唤醒头节点的后继线程。
至于为什么会是这样呢?在共享锁模式下,锁可以被多个线程持有,既然当前线程已经成功获取锁,那么就可以直接通知后继节点尝试获取共享锁,而没有必要等到共享锁释放以后,再去通知后继节点。
2、doReleaseShared是唤醒队列中的后继节操作,没有释放共享锁,因为state已经是0,不需要,后面会分析。
四、共享锁的释放
public void countDown() {
sync.releaseShared(1);
}
releaseShared(1)public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
共享锁的releaseShared方法对应独占锁的release方法。
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
在独占锁模式下,head节点就是对应成功释放锁的线程包装而成的节点,所以它发现自己的waitStatus不等于0(也就是Node.SIGNAL)时,会调用unparkSuccessor唤醒后继节点。
tryReleaseShared(1)protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}
说明:
1、共享锁的释放逻辑也很简单明了,通过CAS操作修改state的值,做减一操作,每个成功获取到共享锁的线程都需要通过tryReleaseShared释放锁,尝试释放共享锁的成功与否,取决于state的值是否为0。
2、在countDown调用以后,若state没有减少到0,表示尝试释放共享锁失败,也就不需要调用doReleaseShared来唤醒队列中后继节点对应的线程。
如果成功释放共享锁,也就是state现在是0,那么需要唤醒队列中的挂起线程。继续看,
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){// loop to recheck casescontinue;}unparkSuccessor(h);}else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){// loop on failed CAScontinue;}}if (h == head){// loop if head changedbreak;}}}
需要理清楚4个问题,分别是:
a 该方法都在什么地方调用过?
有两处,一处是在成功获取到共享锁后(也就是tryAcquireShared返回值大于0),满足一定条件时,在setHeadAndPropagate中有调用。另外一处是,在成功释放共享锁后调用。
b 调用该方法的线程是谁?
在共享锁场景中,可以有多个线程持有共享锁,这些线程都可以调用doReleaseShared来释放锁。而这些线程想要成功获取到共享锁(即tryAcquireShared返回值大于0),那么这些线程要么曾经成为过头节点,或者就是现在的头节点。因此,如果是在releaseShared中调用的doReleaseShared,那么当前线程有可能不是头节点所对应的线程了,因为头节点可能会易主好几次了。
c 调用该方法的目的是什么?
无论是在setHeadAndPropagate,还是releaseShared中调用,doReleaseShared的作用都是一样的,那就是唤醒后继节点,这一点跟独占锁很像,但二者有着一个重要的差别。线程A成功获取共享锁后,会唤醒它的后继节点(对应线程B,线程B的后继节点对应线程C),如果线程A还没有执行完,但是线程B已经成为头节点,那么循环会继续进行,接着唤醒线程C。也就存在一种可能,一个线程就可以将等待队列中的某几个节点对应的线程都唤醒,而不是像独占锁(公平锁)的释放那样,一次只能唤醒队列的头节点对应的那个线程。
d 退出该方法的条件是什么?
在自旋时,如果head没有易主,那么跳出循环。
再接着仔细分析
for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;//第一个if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){// loop to recheck casescontinue;}unparkSuccessor(h);}//第二个else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){// loop on failed CAScontinue;}}if (h == head){// loop if head changedbreak;}}
1、第一个if分支,头节点waitStatus等于Node.SIGNAL时,我们都知道,在独占锁场景下,是需要唤醒后继节点对应的线程的。这里也是一样。
2、另外一个else if分支,头节点的waitStatus等于0说明了头节点的next节点刚刚进入队列,在挂起之前将其pre节点(也就是头节点)的状态ws设置为SIGNAL的操作还没来的及执行。而后面的compareAndSetWaitStatus设置失败,表明这个时候通过shouldParkAfterFailedAcquire设置ws为SIGNAL恰恰成功,这个时候就不给当前线程break的机会了,而是接着进入下一轮循环,下一轮循环就进行1中的操作,一处条件承接了waitStatus的两种变化,细微处可见精彩,这种场景也考虑到了。
3、为什么要将节点的waitStatus设置成Node.PROPAGATE?不要第二个分支,有没有问题呢?
参考:
1 逐行分析AQS源码,共享锁的获取与释放
https://segmentfault.com/a/1190000016447307
2 AQS同步状态的获取与释放
http://cmsblogs.com/?p=2197#i-7
