引言
CountDownLatch是一个很经典的多线程工具。它提供的能力很好理解,就是让一个或者多个线程在其他线程完成一些操作前处于等待状态。它的内部实现中用到了AQS。
一个例子
先来看一个使用CountDownLatch的例子:
public class CountDownLatchTest {
public static void main(String[] args) {
CountDownLatch cdl = new CountDownLatch(10);
for(int i=0;i<10;i++){
int finalI = i;
Thread thread = new Thread(() -> {
System.out.println("线程"+Thread.currentThread().getName()+"开始执行");
try {
Thread.sleep(finalI *1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cdl.countDown();
},"thread_"+i);
thread.start();
}
long awaitStartTime = System.currentTimeMillis();
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis()-awaitStartTime+"秒之后"+"main方法开始执行");
}
}
在这个例子中,主线程调用CountDownLatch的await方法来等待,它等待的是10个线程完成他们的睡眠(睡眠用来替代实际业务中的执行逻辑)。十个线程中的每一个完成睡眠操作之后,都需要调用CountDownLatch的countDown方法。
因为最后一个线程需要睡眠9秒钟,所以大概九秒钟之后main线程会被唤醒继续执行。
重要方法与实现逻辑
从上面的例子可以看到CountDownLatch三个很重要的方法,第一个是构造方法,第二个是await方法,第三个是countDown方法。我们一一分析这三个方法:
构造方法
构造方法的逻辑如下:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
传入的参数count就代表在执行await方法的线程返回前,需要执行countDown方法的次数。
我们看它实际上是调用了内部的同步器的构造方法:
Sync(int count) {
setState(count);
}
这个同步器的构造方法中直接将同步器的状态设置为前面给的参数,相当于锁被持有的数量,要想await方法返回,必须解锁相应的次数。
await方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
await方法实际上是调用的同步器的acquireSharedInterruptibly()方法,也就是说,await是一个请求锁的过程。我们来请求锁的逻辑:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared方法在Sync给出了实现:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
很简单,就是判断state是否等于0。因为我们初始化的时候,state已经被设置成了给定的参数,不为零,所以会调用doAcquireSharedInterruptibly方法,这个方法我们已经讲过,它会将当前线程构造成一个Node放入到等待队列中,然后当前线程会通过调用LockSupport.park方法处于等待状态。
countDown方法
既然await方法是请求锁的操作,那么countDown就是释放锁的操作了:
public void countDown() {
sync.releaseShared(1);
}
它调用的是同步器的releaseShared方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared方法在Sync给出了实现:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
需要注意的点
执行await的多个线程会一起唤醒
从上面我们看到,不管是加锁还是获取锁,都是跟shared相关的,这意味着执行请求锁操作也就是执行await方法的线程,他们之间是不会相互阻塞的,只要countDown方法执行的次数达到数量,这些线程都会被唤醒,看下面的例子:
public class CountDownLatchTest {
public static void main(String[] args) {
CountDownLatch cdl = new CountDownLatch(10);
for(int i=0;i<10;i++){
int finalI = i;
Thread thread = new Thread(() -> {
System.out.println("线程"+Thread.currentThread().getName()+"开始执行");
try {
Thread.sleep(finalI *1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cdl.countDown();
},"thread_"+i);
thread.start();
}
Thread threadCopyMain = new Thread(new Runnable() {
@Override
public void run() {
long l = System.currentTimeMillis();
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis()-l+"毫秒后"+"copyMain方法开始执行");
}
},"copyMain");
threadCopyMain.start();
long awaitStartTime = System.currentTimeMillis();
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis()-awaitStartTime+"毫秒后"+"main方法开始执行");
}
}
主线程和threadCopyMain这个线程都执行了await方法,在10个线程执行了countDown之后,这两个线程都会继续执行,输出如下:
线程thread_0开始执行
线程thread_2开始执行
线程thread_1开始执行
线程thread_4开始执行
线程thread_3开始执行
线程thread_5开始执行
线程thread_6开始执行
线程thread_7开始执行
线程thread_8开始执行
线程thread_9开始执行
9001毫秒后main方法开始执行
9000毫秒后copyMain方法开始执行
可以看出,两者差不多同时执行。因为releaseShared方法会唤醒后续的处于共享状态节点对应的线程。
countDown方法执行的数量不一定是线程的数量
当countDown方法执行到构造方法中给定的数量,等待的线程就会被唤醒,所以对于上面的例子,如果是一个线程执行countDown方法十次,也会有同样的效果,不一定要十个线程每个执行一次countDown方法,看下面的例子:
public class CountDownLatchTest {
public static void main(String[] args) {
CountDownLatch cdl = new CountDownLatch(10);
for(int i=0;i<5;i++){
int finalI = i;
Thread thread = new Thread(() -> {
System.out.println("线程"+Thread.currentThread().getName()+"开始执行");
try {
Thread.sleep(finalI *1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cdl.countDown();
cdl.countDown();
},"thread_"+i);
thread.start();
}
long awaitStartTime = System.currentTimeMillis();
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis()-awaitStartTime+"毫秒后"+"main方法开始执行");
}
}
CountDownLatch构造方法中的数量仍然是10,但是不再是十个线程分别调用countDown,而是五个线程每个线程调用两次,输出如下:
线程thread_0开始执行
线程thread_1开始执行
线程thread_2开始执行
线程thread_3开始执行
线程thread_4开始执行
4000毫秒后main方法开始执行
在实际使用中,这种逻辑可能并不存在,一般都是每个线程做一定的工作然后执行countDown。
小结
CountDownLatch在很多场景中都会用到,例如主线程等待多个线程,每个线程完成自己的部分之后,主线程再执行。CountDownLatch构造方法中的参数,也就是countDown需要调用的次数在初始化之后就不能再修改。下一篇文章,我们来看CyclicBarrier。