一、CountDownLatch的使用
之前看Java并发编程艺术这本书的时候,在线程间的通信方式章节看到,CountDownLatch可以用于线程间的通信。常见场景如下,一群短跑运动员参赛时,在大家都做好起跑准备之后,只要发令枪一响,各个运动员都尝试以最快速度出发。另外,从这个名称计数器闭锁来看,可以用作倒时计数器的用途。举个例子,
private void await() {
CountDownLatch cdl = new CountDownLatch(5);
Runnable task = () -> {
try {
LOGGER.info("be ready, waiting for suck.");
cdl.await();
LOGGER.info("try the best to run.");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
List<Thread> group = new ArrayList<>(10);
int count = 5;
for(int i = 0; i < count; i +=1) {
group.add(new Thread(task, "learning-cdl-" + i));
}
group.parallelStream().forEach(t -> t.start());
//5
cdl.countDown();
//4
cdl.countDown();
//3
cdl.countDown();
//2
cdl.countDown();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//1
cdl.countDown();
}
最主要的有两个方法,一个是await,另外一个是countDown。
二、独占锁 vs 共享锁
后来研究CountDownLatch底层实现源码时才知道,它是基于AQS的共享锁来实现的,之前仅仅浅浅的研究一下独占锁的实现方式。于是抽空借助CountDownLatch来研究一下共享锁的实现过程。
仅仅从名字上,就能发现独占锁与共享锁的最大区别,独占锁表示锁只能由一个线程持有,而共享锁可以由多个线程持有。独占锁,是互斥锁,当某个线程持有锁时,其他线程都需要在一个队列中排队等待,只有持有锁的线程使用完锁资源后释放掉锁后,其他线程才能争抢。而如果某个线程持有共享锁,如果其他线程过来争抢时,极大可能会成功抢占共享锁。
独占锁 | 共享锁 |
---|---|
boolean tryAcquire(int arg) | int tryAcquireShared(int arg) |
boolean tryAcquireNanos(int arg, long nanosTimeout) | boolean tryAcquireSharedNanos(int arg, long nanosTimeout) |
void acquire(int arg) | void acquireShared(int arg) |
void acquireInterruptibly(int arg) | void acquireSharedInterruptibly(int arg) |
boolean tryRelease(int arg) | boolean tryReleaseShared(int arg) |
boolean release(int arg) | boolean releaseShared(int arg) |
void doAcquireInterruptibly(int arg) | private void doAcquireSharedInterruptibly(int arg) |
boolean doAcquireNanos(int arg, long nanosTimeout) | boolean doAcquireSharedNanos(int arg, long nanosTimeout) |
— | void doReleaseShared() |
可以看出,除了最后一个属于共享锁的doReleaseShared以外,其他的方法,独占锁和共享锁都是一一对应的。其实doReleaseSharedy与unparkSuccessor是对应的,不过前者包含了后者,还有一些其他的处理逻辑。
另外,持有独占锁的线程在释放锁后,会唤醒后继节点对应的线程,共享锁的释放就不是这样了。在共享锁模式下,如果一个线程获取到共享锁,那么就可以唤醒后继节点了,这样在线程不释放共享锁时,其他线程也可以获取共享锁。概括的说,在共享锁模式下,线程在获取锁和释放锁时,都会唤醒后继节点。
三、共享锁的获取
CountDownLatch的await方法就是获取共享锁的逻辑。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//尝试获取共享锁,如果state等于0,就获取锁成功
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CountDownLatch内部定义了一个继承AQS的Sync组件,这个组件重写了获取锁和释放锁的逻辑。下面一层一层分析代码。
sync.acquireSharedInterruptibly(1);
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//对于CountDownLatch来说,如果state不等于0,就表示获取锁失败,获取锁失败后,线程需要入队列;
//等于0,就成功获取共享锁。
if (tryAcquireShared(arg) < 0){
doAcquireSharedInterruptibly(arg);
}
}
说明:
1、如果state大于0,表示尝试获取共享锁失败,线程需要进入队列中挂起;创建Latch时,默认初始化state=4,也就是tryAcquireShared(1)会返回-1的。
2、如果state等于0时,线程再去调用cdl.await会直接返回的,表示线程成功获取共享锁。
响应线程中断的获取共享锁代码如下:
doAcquireSharedInterruptibly(1);
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将当前线程包装成Node,如果队列没有元素,则设置头节点(头节点为哑节点),再添加到队列的尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//对于CountDownLatch来说,只要计数没有减少至0,那么r == -1的,获取共享锁失败。
//前面抢占锁失败,如果当前线程对应头节点,那么这里再去尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//获取锁失败后,需要先将前一节点设置为SIGNAL(-1),并挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()){
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
说明:
1、其实这段doAcquireSharedInterruptibly与doAcquireInterruptilby结构没有什么不同的,主要有2处处理不同。可以看出,同独占锁的获取方式类似,如果抢占锁失败,线程是要入队列的。这里的获取共享锁失败的线程,也通过addWaiter添加到队列尾部。
addWaiter(Node.EXCLUSIVE) | addWaiter(Node.SHARED) |
---|---|
if (p == head && tryAcquire(arg)) { setHead(node); … return; } |
if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); … return; } } |
我们回顾一下Node的结构
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
// Used by addWaiter
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
}
大家不要认为这里nextWaiter可以辅助形成一个单项链表,它的赋值是一个固定的对象,所以也仅仅是起到一个标识的作用。
2、只有线程将CountDownLatch的计数减少到0,这样tryAcquireShared才返回1(即成功获取锁),才能进行setHeadAndPropagate,否则直接进行shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()。
3、对于独占锁来说,setHead仅仅是将获取到锁的线程出队列(可以这么理解,将节点的thread设置为null,其实是保存在AQS.exclusiveOwnerThread,prev设置为null)。对于共享锁来说,setHeadAndPropagate不仅调用了setHead,还在一定条件下,调用了doReleaseShared,这个doReleaseShared的逻辑是释放共享锁,并唤醒后继节点。
如果某一线程成功获取共享锁之后,需要做什么事情呢?接着进入
setHeadAndPropagate(node, 1)
private void setHeadAndPropagate(Node node, int propagate) {
// Record old head for check below
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
说明:
1、既然调用了setHeadAndPropagate,就说明CountDownLatch的计数器值减少到0了,也就是说等待队列中线程都可以尝试获取共享锁,所以setHeadAndPropagate的作用应该是释放共享锁,并唤醒头节点的后继线程。
至于为什么会是这样呢?在共享锁模式下,锁可以被多个线程持有,既然当前线程已经成功获取锁,那么就可以直接通知后继节点尝试获取共享锁,而没有必要等到共享锁释放以后,再去通知后继节点。
2、doReleaseShared是唤醒队列中的后继节操作,没有释放共享锁,因为state已经是0,不需要,后面会分析。
四、共享锁的释放
public void countDown() {
sync.releaseShared(1);
}
releaseShared(1)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
共享锁的releaseShared方法对应独占锁的release方法。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在独占锁模式下,head节点就是对应成功释放锁的线程包装而成的节点,所以它发现自己的waitStatus不等于0(也就是Node.SIGNAL)时,会调用unparkSuccessor唤醒后继节点。
tryReleaseShared(1)
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
说明:
1、共享锁的释放逻辑也很简单明了,通过CAS操作修改state的值,做减一操作,每个成功获取到共享锁的线程都需要通过tryReleaseShared释放锁,尝试释放共享锁的成功与否,取决于state的值是否为0。
2、在countDown调用以后,若state没有减少到0,表示尝试释放共享锁失败,也就不需要调用doReleaseShared来唤醒队列中后继节点对应的线程。
如果成功释放共享锁,也就是state现在是0,那么需要唤醒队列中的挂起线程。继续看,
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
// loop to recheck cases
continue;
}
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
// loop on failed CAS
continue;
}
}
if (h == head){
// loop if head changed
break;
}
}
}
需要理清楚4个问题,分别是:
a 该方法都在什么地方调用过?
有两处,一处是在成功获取到共享锁后(也就是tryAcquireShared返回值大于0),满足一定条件时,在setHeadAndPropagate中有调用。另外一处是,在成功释放共享锁后调用。
b 调用该方法的线程是谁?
在共享锁场景中,可以有多个线程持有共享锁,这些线程都可以调用doReleaseShared来释放锁。而这些线程想要成功获取到共享锁(即tryAcquireShared返回值大于0),那么这些线程要么曾经成为过头节点,或者就是现在的头节点。因此,如果是在releaseShared中调用的doReleaseShared,那么当前线程有可能不是头节点所对应的线程了,因为头节点可能会易主好几次了。
c 调用该方法的目的是什么?
无论是在setHeadAndPropagate,还是releaseShared中调用,doReleaseShared的作用都是一样的,那就是唤醒后继节点,这一点跟独占锁很像,但二者有着一个重要的差别。线程A成功获取共享锁后,会唤醒它的后继节点(对应线程B,线程B的后继节点对应线程C),如果线程A还没有执行完,但是线程B已经成为头节点,那么循环会继续进行,接着唤醒线程C。也就存在一种可能,一个线程就可以将等待队列中的某几个节点对应的线程都唤醒,而不是像独占锁(公平锁)的释放那样,一次只能唤醒队列的头节点对应的那个线程。
d 退出该方法的条件是什么?
在自旋时,如果head没有易主,那么跳出循环。
再接着仔细分析
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//第一个
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
// loop to recheck cases
continue;
}
unparkSuccessor(h);
}
//第二个
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
// loop on failed CAS
continue;
}
}
if (h == head){
// loop if head changed
break;
}
}
1、第一个if分支,头节点waitStatus等于Node.SIGNAL时,我们都知道,在独占锁场景下,是需要唤醒后继节点对应的线程的。这里也是一样。
2、另外一个else if分支,头节点的waitStatus等于0说明了头节点的next节点刚刚进入队列,在挂起之前将其pre节点(也就是头节点)的状态ws设置为SIGNAL的操作还没来的及执行。而后面的compareAndSetWaitStatus设置失败,表明这个时候通过shouldParkAfterFailedAcquire设置ws为SIGNAL恰恰成功,这个时候就不给当前线程break的机会了,而是接着进入下一轮循环,下一轮循环就进行1中的操作,一处条件承接了waitStatus的两种变化,细微处可见精彩,这种场景也考虑到了。
3、为什么要将节点的waitStatus设置成Node.PROPAGATE?不要第二个分支,有没有问题呢?
参考:
1 逐行分析AQS源码,共享锁的获取与释放
https://segmentfault.com/a/1190000016447307
2 AQS同步状态的获取与释放
http://cmsblogs.com/?p=2197#i-7