本文转载自https://maimai.cn/article/detail?fid=1502223322&efid=aQyPbaZDu0yajsTMeHXAGg&use_rn=1
如果看过《Java编程思想》的话,你一定对以下代码不陌生:
synchronized (lock) {
while (check pass) {
lock.wait();
}
// do your business
}
那么问题是为啥这里是 while
而不是 if
呢?这个问题我最开始也想了很久,按理说已经在 synchronized
块里面了嘛,已经很安全了。这个我以前也一直是这么认为的,知道最近看了一个Stackoverflow 上的问题才对这个问题有了比较深入的理解。
我们试想有一个有界的阻塞队列,参考JDK源码中的 ArrayBlockingQueue
,我们稍作改动,于是有了一下代码:
public class ArrayListBlockingQueue {
private final int MAX = 5;
private final Object lock = new Object();
private final ArrayList<Integer> list = new ArrayList<>();
/**
* 添加元素,当队列满了的时候,就阻塞
* @param entry
* @throws InterruptedException
*/
public void put(int entry) throws InterruptedException {
synchronized (lock) {
if (this.size() == MAX)
lock.wait();
list.add(entry);
lock.notifyAll();
}
}
/**
* 出队,当队列为空的时候就阻塞
* @return
* @throws InterruptedException
*/
public int take() throws InterruptedException {
synchronized (lock) { // line 0
if (this.size() == 0) { // line 1
lock.wait(); // line2
// line 3
}
int entry = list.remove(0); // line 4
lock.notifyAll(); // line 5
return entry;
}
}
public int size() {
synchronized (lock) {
return list.size();
}
}
}
注意到这里用的 if
,那么我们来看看它会有什么问题呢?下面的代码用了一个线程来 put
,10个线程来 take
public static void main(String[] args) throws InterruptedException {
ArrayListBlockingQueue queue = new ArrayListBlockingQueue();
ExecutorService es = Executors.newFixedThreadPool(11);
es.execute(() -> {
while (true) {
try {
queue.put(1);
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
});
for (int i = 0; i < 10; i++) {
es.execute(() -> {
while (true ) {
try {
queue.take();
Thread.sleep(10);
}
catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
});
}
es.shutdown();
es.awaitTermination(1, TimeUnit.DAYS);
}
很快这段代码就会报错:
Exception in thread “pool-1-thread-2” Exception in thread “pool-1-thread-11” Exception in thread “pool-1-thread-10” java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0 at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
at java.base/java.util.Objects.checkIndex(Objects.java:372)
at java.base/java.util.ArrayList.remove(ArrayList.java:536)
at com.baotai.common.demo.ArrayListBlockingQueue.take(ArrayListBlockingQueue.java:40)
at com.baotai.common.demo.Demo.lambda$main$1(Demo.java:30)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
很明显,在remove方法中报错了。那么我们来分析下:
假设现在有A,B两个线程来执行 take
操作,我们假设发生如下步骤:
- A 拿到了锁 line 0;
- A 发现 size == 0 (line 1),然后进入等待并释放锁 (line 2);
- 此时 B 拿到了锁 (line 0),发现 size == 0 (line 1),也进入等待并释放锁 (line 2);
- 这个时候有个线程 C 往队列里面加了个元素1,然后 notifyAll 所有等待的线程都被唤醒;
- A B 重新获取锁,假设又是 A 先拿刀锁,然后它走到了 line 3,移除了一个元素 (line 4),没有问题;
- A 移除元素后调用了 notifyAll (line 5),这个时候把 B 给唤醒了,那么 B接着往下走;
- 这个时候 B 就出问题了,因为其实此时的 size 依旧为0,但是 B 不会再去做 size == 0 的判断了,而是直接走到了下面的 remove 代码,结果就抛了异常了。
那么 fix 很简单,将原先的 if
改为 while
就好了:
public int take() throws InterruptedException {
synchronized (lock) { // line 0
while (this.size() == 0) { // line 1
lock.wait(); // line2
// line 3
}
int entry = list.remove(0); // line 4
lock.notifyAll(); // line 5
return entry;
}
}
同样的道理, put
里面用 if
也是不行的。
我们可以尝试用10个线程去 put
,并且打印当前队列的 size
,用一个线程去 take
,你会发现队列的元素数量会超出最大限制5。
public static void main(String[] args) throws InterruptedException {
ArrayListBlockingQueue queue = new ArrayListBlockingQueue();
ExecutorService es = Executors.newFixedThreadPool(11);
for (int i = 0; i < 10; i++) {
es.execute(() -> {
while (true ) {
try {
queue.put(1);
System.out.println(queue.size());
Thread.sleep(20);
}
catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
});
}
es.execute(() -> {
while (true) {
try {
queue.take();
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
});
es.shutdown();
es.awaitTermination(1, TimeUnit.DAYS);
}
输出结果:
我们会发现,虽然我在put方法中会判断 size == 5
,如果等于5就会阻塞,但是输出结果却并不如我们所愿。有人可能会说将判断条件 size == 5
改成 size >= 5
就好,但实际上输出结果依旧会超过5。
我们来分析一下上面结果的原因:
假设现在队列中已经有5个元素了,现在有 A,B 两个线程调用了put方法,于是两个线程都在等待,接着有个线程 C 调用了 take 方法,将队列中的元素移除一个并notifyAll 所有等待的线程被唤醒。此时队列中的元素有4个,A B线程解除等待之后不会再判断 size == 0,于是都走下面的 add 代码,两个线程都 add 之后,队列中的元素数量就会有6个了。解决办法也是将 if
改成 while