本文转载自https://maimai.cn/article/detail?fid=1502223322&efid=aQyPbaZDu0yajsTMeHXAGg&use_rn=1

    如果看过《Java编程思想》的话,你一定对以下代码不陌生:

    1. synchronized (lock) {
    2. while (check pass) {
    3. lock.wait();
    4. }
    5. // do your business
    6. }

    那么问题是为啥这里是 while 而不是 if 呢?这个问题我最开始也想了很久,按理说已经在 synchronized 块里面了嘛,已经很安全了。这个我以前也一直是这么认为的,知道最近看了一个Stackoverflow 上的问题才对这个问题有了比较深入的理解。

    我们试想有一个有界的阻塞队列,参考JDK源码中的 ArrayBlockingQueue ,我们稍作改动,于是有了一下代码:

    1. public class ArrayListBlockingQueue {
    2. private final int MAX = 5;
    3. private final Object lock = new Object();
    4. private final ArrayList<Integer> list = new ArrayList<>();
    5. /**
    6. * 添加元素,当队列满了的时候,就阻塞
    7. * @param entry
    8. * @throws InterruptedException
    9. */
    10. public void put(int entry) throws InterruptedException {
    11. synchronized (lock) {
    12. if (this.size() == MAX)
    13. lock.wait();
    14. list.add(entry);
    15. lock.notifyAll();
    16. }
    17. }
    18. /**
    19. * 出队,当队列为空的时候就阻塞
    20. * @return
    21. * @throws InterruptedException
    22. */
    23. public int take() throws InterruptedException {
    24. synchronized (lock) { // line 0
    25. if (this.size() == 0) { // line 1
    26. lock.wait(); // line2
    27. // line 3
    28. }
    29. int entry = list.remove(0); // line 4
    30. lock.notifyAll(); // line 5
    31. return entry;
    32. }
    33. }
    34. public int size() {
    35. synchronized (lock) {
    36. return list.size();
    37. }
    38. }
    39. }

    注意到这里用的 if ,那么我们来看看它会有什么问题呢?下面的代码用了一个线程来 put ,10个线程来 take

    1. public static void main(String[] args) throws InterruptedException {
    2. ArrayListBlockingQueue queue = new ArrayListBlockingQueue();
    3. ExecutorService es = Executors.newFixedThreadPool(11);
    4. es.execute(() -> {
    5. while (true) {
    6. try {
    7. queue.put(1);
    8. Thread.sleep(20);
    9. } catch (InterruptedException e) {
    10. e.printStackTrace();
    11. break;
    12. }
    13. }
    14. });
    15. for (int i = 0; i < 10; i++) {
    16. es.execute(() -> {
    17. while (true ) {
    18. try {
    19. queue.take();
    20. Thread.sleep(10);
    21. }
    22. catch (InterruptedException e) {
    23. e.printStackTrace();
    24. break;
    25. }
    26. }
    27. });
    28. }
    29. es.shutdown();
    30. es.awaitTermination(1, TimeUnit.DAYS);
    31. }

    很快这段代码就会报错:

    Exception in thread “pool-1-thread-2” Exception in thread “pool-1-thread-11” Exception in thread “pool-1-thread-10” java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0 at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)

    1. at java.base/java.util.Objects.checkIndex(Objects.java:372)
    2. at java.base/java.util.ArrayList.remove(ArrayList.java:536)
    3. at com.baotai.common.demo.ArrayListBlockingQueue.take(ArrayListBlockingQueue.java:40)
    4. at com.baotai.common.demo.Demo.lambda$main$1(Demo.java:30)
    5. at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    6. at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    7. at java.base/java.lang.Thread.run(Thread.java:834)

    很明显,在remove方法中报错了。那么我们来分析下:

    假设现在有A,B两个线程来执行 take 操作,我们假设发生如下步骤:

    1. A 拿到了锁 line 0;
    2. A 发现 size == 0 (line 1),然后进入等待并释放锁 (line 2);
    3. 此时 B 拿到了锁 (line 0),发现 size == 0 (line 1),也进入等待并释放锁 (line 2);
    4. 这个时候有个线程 C 往队列里面加了个元素1,然后 notifyAll 所有等待的线程都被唤醒;
    5. A B 重新获取锁,假设又是 A 先拿刀锁,然后它走到了 line 3,移除了一个元素 (line 4),没有问题;
    6. A 移除元素后调用了 notifyAll (line 5),这个时候把 B 给唤醒了,那么 B接着往下走;
    7. 这个时候 B 就出问题了,因为其实此时的 size 依旧为0,但是 B 不会再去做 size == 0 的判断了,而是直接走到了下面的 remove 代码,结果就抛了异常了。

    那么 fix 很简单,将原先的 if 改为 while 就好了:

    1. public int take() throws InterruptedException {
    2. synchronized (lock) { // line 0
    3. while (this.size() == 0) { // line 1
    4. lock.wait(); // line2
    5. // line 3
    6. }
    7. int entry = list.remove(0); // line 4
    8. lock.notifyAll(); // line 5
    9. return entry;
    10. }
    11. }

    同样的道理, put 里面用 if 也是不行的。
    我们可以尝试用10个线程去 put ,并且打印当前队列的 size ,用一个线程去 take ,你会发现队列的元素数量会超出最大限制5。

    1. public static void main(String[] args) throws InterruptedException {
    2. ArrayListBlockingQueue queue = new ArrayListBlockingQueue();
    3. ExecutorService es = Executors.newFixedThreadPool(11);
    4. for (int i = 0; i < 10; i++) {
    5. es.execute(() -> {
    6. while (true ) {
    7. try {
    8. queue.put(1);
    9. System.out.println(queue.size());
    10. Thread.sleep(20);
    11. }
    12. catch (InterruptedException e) {
    13. e.printStackTrace();
    14. break;
    15. }
    16. }
    17. });
    18. }
    19. es.execute(() -> {
    20. while (true) {
    21. try {
    22. queue.take();
    23. Thread.sleep(10);
    24. } catch (InterruptedException e) {
    25. e.printStackTrace();
    26. break;
    27. }
    28. }
    29. });
    30. es.shutdown();
    31. es.awaitTermination(1, TimeUnit.DAYS);
    32. }

    输出结果:
    Snipaste_2020-10-19_13-58-12.png
    我们会发现,虽然我在put方法中会判断 size == 5 ,如果等于5就会阻塞,但是输出结果却并不如我们所愿。有人可能会说将判断条件 size == 5 改成 size >= 5 就好,但实际上输出结果依旧会超过5。
    我们来分析一下上面结果的原因:
    假设现在队列中已经有5个元素了,现在有 A,B 两个线程调用了put方法,于是两个线程都在等待,接着有个线程 C 调用了 take 方法,将队列中的元素移除一个并notifyAll 所有等待的线程被唤醒。此时队列中的元素有4个,A B线程解除等待之后不会再判断 size == 0,于是都走下面的 add 代码,两个线程都 add 之后,队列中的元素数量就会有6个了。解决办法也是将 if 改成 while