1 传统的多线程通信

synchronized的通信锁机制

synchronized+lockObj.wait()+lockObj.notify()

概览

03 多线程通信 - 图1

lockObj.wait()方法

在当前线程中调用方法:lockObj.wait()会使当前线程进入等待(某对象)状态 ,令当前线程挂起并放弃CPU、同步资源并等待,使别的线程可访问并修改共享资源,而当前线程排队等候其他线程调用notify()或notifyAll()方法唤醒,唤醒后等待重新获得对监视器的所有权后才能继续执行。
调用方法的必要条件:当前线程必须具有对该对象的监控权(加锁),调用此方法后,当前线程将释放对象监控权 ,然后进入等待,在当前线程被notify后,要重新获得监控权,然后从断点处继续代码的执行。

lockObj.notify()/notifyAll()方法

在当前线程中调用方法: 共享资源.notify()。
功能:唤醒等待该对象监控权的一个/所有线程。 调用方法的必要条件:当前线程必须具有对该对象的监控权(加锁)。本线程如果还有代码要执行,则会等本线程交出同步监视器后再唤醒其他线程。The awakened thread will not be able to proceed until the current thread relinquishes the lock on this object.
notify():唤醒正在排队等待同步资源的线程中优先级最高者结束等待。
notifyAll():唤醒正在排队等待资源的所有线程结束等待。

wait()、notify()、notifyAll()这三个方法只有在synchronized方法或synchronized代码块中才能使用,否则会报java.lang.IllegalMonitorStateException异常。因为这三个方法必须有锁对象调用,而任意对象都可以作为synchronized的同步锁,因此这三个方法只能在Object类中声明。

代码

  1. class Ecoco10ApplicationTests {
  2. private Object object = new Object();
  3. @Test
  4. void contextLoads() throws InterruptedException {
  5. Thread thread = new Thread() {
  6. @SneakyThrows
  7. @Override
  8. public void run() {
  9. synchronized (object) {
  10. System.out.println("线程1" + System.currentTimeMillis());
  11. int i = 0;
  12. while (i <= 10) {
  13. if (i == 2)
  14. object.wait();//不能使用this.wait();而是同步资源对象调用wait()方法以阻塞在其身上的线程。
  15. System.out.println("线程A" + i);
  16. i++;
  17. }
  18. object.notify();//通知唤醒其他线程:在当前线程释放了同步监视器之后。
  19. }
  20. }
  21. };
  22. Thread thread2 = new Thread() {
  23. @SneakyThrows
  24. @Override
  25. public void run() {
  26. synchronized (object) {
  27. int j = 0;
  28. while (j <= 10) {
  29. System.out.println(" 线程B" + j);
  30. if (j == 5)
  31. object.notify();//不会立马就会唤醒其他线程,而是会继续执行本线程,直到交出同步监视器。
  32. if (j == 8)
  33. object.wait();
  34. j++;
  35. }
  36. object.notify();
  37. }
  38. }
  39. };
  40. thread.start();
  41. thread2.start();
  42. Thread.sleep(50000);
  43. }
  44. }
  45. //打印输出
  46. 线程A0
  47. 线程A1
  48. 线程B0
  49. 线程B1
  50. 线程B2
  51. 线程B3
  52. 线程B4
  53. 线程B5
  54. 线程B6
  55. 线程B7
  56. 线程B8
  57. 线程A2
  58. 线程A3
  59. 线程A4
  60. 线程A5
  61. 线程A6
  62. 线程A7
  63. 线程A8
  64. 线程A9
  65. 线程A10
  66. 线程B9
  67. 线程B10

使用Lock的通信锁机制

lock+ await+ signal来实现。可参考lock一节或生产者和消费者问题一节

线程中断机制

中断机制一些概念

  1. 一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止。所以,Thread.stop、Thread.suspend、Thread. resume都已经被废弃了。
  2. 在Java中没有办法立即停止一条线程,然而停止线程却显得尤为重要,如取消一个耗时操作。因此,Java提供了一种用于停止线程的机制—中断协商机制。中断只是一种协商机制,Java没有给中断增加任何语法,中断的过程完全需要程序员自己实现。
  3. 若要中断一个线程,我们需要自己手动调用该线程的interrupt()方法,该方法也仅仅是将线程对象的中断标识设为true。需要时刻谨记的:中断只是一种协同机制,调用interrupt()和interrupted()修改的只是中断标识位而已,而不是立即stop打断。
  4. 每个线程对象中都有一个标识,用于标识线程是否被中断;该标识位为true表示中断,为false表示未中断;通过调用线程对象的interrupt()方法将线程的标识位设为true;该方法可以在别的线程中调用,也可以在自己的线程中调用。

参考文档:什么是中断

三种方式实现优雅的线程中断

  1. 通过一个volatile变量实现
  2. 通过AtomicBoolean
  3. 通过Thread类自带的中断Interrupt机制AP方法实现,即isInterrupted()和interrupt()方法 ```java package com.fly.ecoco10;

import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean;

public class InterruptDemo { static volatile boolean isStop = false; static AtomicBoolean atomicBoolean = new AtomicBoolean(false);

  1. public static void main(String[] args) {
  2. useByVolatile();//通过一个volatile变量实现
  3. useByAtomicBoolean();//通过AtomicBoolean
  4. useByInterrupt();//通过Interrupt中断机制
  5. }
  6. /**
  7. * 方式一:通过一个volatile变量实现
  8. */
  9. public static void useByVolatile() {
  10. new Thread(() -> {
  11. while (true) {
  12. if (isStop) {
  13. System.out.println("-----isStop = true,程序结束。");
  14. break;
  15. }
  16. System.out.println("------hello isStop");
  17. }
  18. }, "t1").start();
  19. //暂停几秒钟线程
  20. try {
  21. TimeUnit.SECONDS.sleep(1);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. new Thread(() -> {
  26. isStop = true;
  27. }, "t2").start();
  28. }
  29. /**
  30. * 方式二:通过AtomicBoolean
  31. */
  32. public static void useByAtomicBoolean() {
  33. new Thread(() -> {
  34. while (true) {
  35. if (atomicBoolean.get()) {
  36. System.out.println("-----atomicBoolean.get() = true,程序结束。");
  37. break;
  38. }
  39. System.out.println("------hello atomicBoolean");
  40. }
  41. }, "t1").start();
  42. //暂停几秒钟线程
  43. try {
  44. TimeUnit.SECONDS.sleep(1);
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. new Thread(() -> {
  49. atomicBoolean.set(true);
  50. }, "t2").start();
  51. }
  52. /**
  53. * 方式三:通过Interrupt中断机制来实现
  54. */
  55. public static void useByInterrupt() {
  56. Thread t1 = new Thread(() -> {
  57. while (true) {
  58. if (Thread.currentThread().isInterrupted()) {
  59. System.out.println("-----isInterrupted() = true,程序结束。");
  60. break;
  61. }
  62. System.out.println("------hello Interrupt");
  63. }
  64. }, "t1");
  65. t1.start();
  66. try {
  67. TimeUnit.SECONDS.sleep(1);
  68. } catch (InterruptedException e) {
  69. e.printStackTrace();
  70. }
  71. new Thread(() -> {
  72. t1.interrupt();//修改t1线程的中断标志位为true
  73. }, "t2").start();
  74. }

}

  1. **关于Interrupt中断机制的方法说明**
  2. > **实例方法:void interrupt( )**
  3. - `interrupt( )`仅仅是设置线程的中断状态未true,不会停止线程,真正决定是否终止线程必须由线程内部自己决定。
  4. - 着重说明:线程如果因为调用了wait()、join()、sleep()方法而被阻塞时候,此时调用了实例的interupt()方法,这时候就会抛出InterruptedException,并会将中断标识位清空(即设置为false)。解决:在catch里面再调用一次interrupt()方法以将中断状态置true
  5. ```java
  6. public class JUCTest {
  7. public static void main(String[] args) {
  8. Thread thread = new Thread() {
  9. @Override
  10. public void run() {
  11. super.run();
  12. while (true) {
  13. //这里不会中断,因为线程处于Sleep的时候调用了interrupt,此时会报错,并将中转状态置为空,即默认的不中断,所以if判断失败
  14. if (Thread.currentThread().isInterrupted()) {
  15. System.out.println("中断输出");
  16. break;
  17. }
  18. try {
  19. TimeUnit.MILLISECONDS.sleep(500);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. System.out.println("111");
  24. }
  25. }
  26. };
  27. thread.start();
  28. try {
  29. TimeUnit.MILLISECONDS.sleep(100);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. thread.interrupt();
  34. }
  35. }
  36. //演示说明,会一直输出,而不会停止
  37. java.lang.InterruptedException: sleep interrupted
  38. at java.lang.Thread.sleep(Native Method)
  39. at java.lang.Thread.sleep(Thread.java:340)
  40. at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
  41. at com.efly.gulimall.gulimallproduct.JUCTest$1.run(JUCTest.java:20)
  42. 111
  43. 111
  44. 111
  45. 111
  46. 111
  47. //解决方案
  48. public class JUCTest {
  49. public static void main(String[] args) {
  50. Thread thread = new Thread() {
  51. @Override
  52. public void run() {
  53. super.run();
  54. while (true) {
  55. if (Thread.currentThread().isInterrupted()) {
  56. System.out.println("中断输出");
  57. break;
  58. }
  59. try {
  60. TimeUnit.MILLISECONDS.sleep(500);
  61. } catch (InterruptedException e) {
  62. //只需在这里加一句即可
  63. Thread.currentThread().interrupt();
  64. e.printStackTrace();
  65. }
  66. System.out.println("111");
  67. }
  68. }
  69. };
  70. thread.start();
  71. try {
  72. TimeUnit.MILLISECONDS.sleep(100);
  73. } catch (InterruptedException e) {
  74. e.printStackTrace();
  75. }
  76. thread.interrupt();
  77. }
  78. }
  79. //结果演示
  80. java.lang.InterruptedException: sleep interrupted
  81. at java.lang.Thread.sleep(Native Method)
  82. at java.lang.Thread.sleep(Thread.java:340)
  83. at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
  84. at com.efly.gulimall.gulimallproduct.JUCTest$1.run(JUCTest.java:22)
  85. 111
  86. 中断输出

实例方法:boolean isInterrupted( )

判断当前线程是否被中断(通过检查中断标识位) 实例方法。

静态方法:static boolean interrupted( )

判断线程是否被中断,并清除当前中断状态,这个方法做了两件事:

  • 返回当前线程的中断状态
  • 将当前线程的中断状态设为false,类似于取消中断标识位 ```java System.out.println(Thread.currentThread().getName()+”—-“+Thread.interrupted()); System.out.println(Thread.currentThread().getName()+”—-“+Thread.interrupted()); System.out.println(“111111”); Thread.currentThread().interrupt();///——false—-> true System.out.println(“222222”); System.out.println(Thread.currentThread().getName()+”—-“+Thread.interrupted()); System.out.println(Thread.currentThread().getName()+”—-“+Thread.interrupted());

//输出结果 main—-false//先输出中断标识位即默认值为false,并将中断标识为设置为false。类似于取消中断 main—-false// 111111 222222 main—-true//实例方法:将中断标识位改为true main—-false//静态方法:将中断标识位改为false

  1. <a name="gYEDv"></a>
  2. # 2 LockSupport
  3. > **概述说明**
  4. 参考文档:[LockSupport的使用](https://blog.csdn.net/TZ845195485/article/details/118404659)
  5. 1. 通过`park()`和`unpark(thread)`方法来实现阻塞当前线程和唤醒指定线程的操作。
  6. 1. LockSupport是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,阻塞之后也有对应的唤醒方法。
  7. 1. 归根结底,LockSupport调用的Unsafe中的native代码。
  8. 1. 官方对于LockSupport的描述及核心两个方法的源码解读
  9. ```java
  10. Basic thread blocking primitives for creating locks and other synchronization classes.This class associates, with each thread that uses it, a permit (in the sense of the Semaphore class). A call to park will return immediately if the permit is available, consuming it in the process; otherwise it may block. A call to unpark makes the permit available, if it was not already available. (Unlike with Semaphores though, permits do not accumulate. There is at most one.)
  11. LockSupport是用来创建锁和其他同步类的基本线程阻塞原语,LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可(permit),permit只有两个值1和零,默认是零.可以把许可看成是一种(0,1)信号量(Semaphore),但与Semaphore不同的是,许可的累加上限是1。
  12. ////源码解读////
  13. //java.util.concurrent.locks.LockSupport类
  14. package java.util.concurrent.locks;
  15. import sun.misc.Unsafe;
  16. public class LockSupport {
  17. public static void park() {
  18. UNSAFE.park(false, 0L);
  19. }
  20. public static void unpark(Thread thread) {
  21. if (thread != null)
  22. UNSAFE.unpark(thread);
  23. }
  24. }
  25. //sun.misc.Unsafe类
  26. package sun.misc;
  27. public final class Unsafe {
  28. public native void unpark(Object var1);
  29. public native void park(boolean var1, long var2);
  30. }

使用LockSupport解决的痛点

  • LockSupport不用持有锁块,不用加锁,程序性能好
  • 先后顺序:不容易导致卡死(因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞)

    使用:park()和unpark(threadObj)

park():先判断permit是否为1,如果是的话则不阻塞,直接继续执行,并将permit置为0。否则一直阻塞,直到被唤醒(其他线程调用了unpark)或被中断。由于permit默认是0,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设置为1时, park方法才会被唤醒然后会将permit再次设置为0并返回。If the permit is available then it is consumed and the call returns immediately; otherwise the current thread becomes disabled for thread scheduling purposes and lies dormant until one ofthree things happens:Some other thread invokes unpark with the current thread as the target; or Some other thread interrupts the current thread; or The call spuriously (that is, for no
reason) returns.
小案例

  1. package com.fly.ecoco10;
  2. import java.util.concurrent.TimeUnit;
  3. import java.util.concurrent.locks.LockSupport;
  4. public class LockSupportDemo {
  5. public static void main(String[] args) {
  6. System.out.println("before当前时间:" + System.currentTimeMillis());
  7. LockSupport.unpark(Thread.currentThread());//如果先调用unpark则会将permit置为1,此时再调用park()就不会阻塞了
  8. LockSupport.park();//判断permit是否O,如果是的话会阻塞直到别的线程将该线程的permit置为1,然后消费permit并解除阻塞从而继续执行
  9. System.out.println("after当前时间:" + System.currentTimeMillis());
  10. }
  11. }
  12. //效果
  13. before当前时间:1644562613283
  14. after当前时间:1644562613284

unpark(threadObj):唤醒处于阻塞状态的指定线程,调用unpark(threadObj)方法后,就会将thread线程的许可permit设置成1(注意多次调用unpark方法,不会累加,pemit值还是1),其会自动唤醒theadObj线程,即之前阻塞中的LockSupport.park()方法会立即返回。
小案例

  1. public class LockSupportDemo {
  2. public static void main(String[] args) throws InterruptedException{
  3. //线程1
  4. Thread thread1 = new Thread(){
  5. @SneakyThrows
  6. @Override
  7. public void run() {
  8. System.out.println("进入线程1,当前时间:" + System.currentTimeMillis());
  9. LockSupport.park();
  10. System.out.println("线程1第一次被unpark,当前时间:" + System.currentTimeMillis());
  11. TimeUnit.MILLISECONDS.sleep(20);//休眠20毫秒
  12. LockSupport.park();
  13. System.out.println("线程1第二次被unpark了,当前时间:" + System.currentTimeMillis());
  14. }
  15. };
  16. thread1.start();
  17. TimeUnit.MILLISECONDS.sleep(20);//主线程休眠20毫秒
  18. //线程2
  19. Thread thread2 = new Thread() {
  20. @SneakyThrows
  21. @Override
  22. public void run() {
  23. super.run();
  24. //调用unpark(threadObj)方法后,就会将threadObj线程的许可permit设置成1。threadObj从而自动解除锁定并消耗一个permit
  25. //注意多次调用unpark方法,不会累加,permit值还是1。
  26. LockSupport.unpark(thread1);
  27. TimeUnit.MILLISECONDS.sleep(10);//休眠10毫秒
  28. LockSupport.unpark(thread1);
  29. }
  30. };
  31. thread2.start();
  32. }
  33. }
  34. //结果
  35. 进入线程1,当前时间:1644563471703
  36. 线程1第一次被unpark,当前时间:1644563471723
  37. 线程1第二次被unpark了,当前时间:1644563471743

图解说明
image.png

面试题

Q:为什么可以先唤醒线程后阻塞线程?
A:因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞。
Q:为什么唤醒两次后阻塞两次,但最终结果还会阻塞线程?
A:因为凭证的数量最多为1,连续调用两次unpark和调用一次unpark效果一样,只会增加一个凭证;而调用两次park却需要消费两个凭证,证不够,不能放行。

3 阻塞队列BlockingQueue

概念

原有的问题

在同步机制基础板块我们已经看到了形成Java并发程序设计基础的底层构建块。然而,对于实际编程来说,应该尽可能远离底层结构。使用由并发处理的专业人士实现的较高层次的结构要方便得多、要安全得多。

为什么要用阻塞队列

对于许多线程问题,可以通过使用一个或多个队列以优雅且安全的方式将其形式化。 使用队列,可以安全的从一个线程向另一个线程传递数据 。 例如, 考虑银行转账程序, 转账线程将转账指令对象插入一个队列中 ,而不是直接访问银行对象。 另一个线程从队列中取出指令执行转账 。 只有该线程可以访问该银行对象的内部。 因此不需要同步(当然, 线程安全的队列类的实现者不能不考虑锁和条件, 但是 , 那是他们的问题而不是我们使用者的问题 )。在生产者-消费者的设计模式中,生产者线程向队列插人元素,消费者线程则取出它们。
简而言之:阻塞队列的好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都帮我们一手包办了。而在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
参考文档:阻塞队列,另对于阻塞队列的使用可以和RabbitMQ结合学习。

使用

BlockingQueue接口及其实现类

java.util.concurrent.BlockingQueue阻塞队列是属于一个接口,底下有七个实现类,其中着重要掌握的是ArrayBlockQueue、LinkedBlockingQueue、SynchronousQueue。

  • ArrayBlockQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列(有界,但是界限非常大,相当于无界,可以当成无界使用)
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列(生产一个,消费一个,不存储元素,不消费不生产)
  • PriorityBlockQueue:支持优先级排序的无界阻塞队列
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

    BlockingQueue的具体方法

阻塞队列方法分为3 类,其取决于当队列满或空时它们的响应方式。

  • 如果将队列当作线程管理工具来使用即阻塞与唤醒, 将要用到put和take方法。
  • 当试图向满的队列中添加或从空的队列中移出元素时,add 、remove和element操作抛出异常 。
  • 当然 , 在一个多线程程序中 , 队列会在任何时候空或满 , 因此, 一定要使用offer 、 poll和peek方法作为替代。 这些方法如果不能完成任务, 只是给出一个错误提示而不会抛出异常 。 | 方法分类 | 方法类型 | | | 说明 | | —- | —- | —- | —- | —- | | | 插入 | 移除 | 检查 | | | 安全推荐组 | offer(e) | poll() | peak() | 我们使用offer(e)方法,添加元素时候,如果阻塞队列满了后,会返回false,否者返回true
    同时在取poll()的时候,如果队列已空,那么会返回null
    peak():拿第一个元素但不移除,如果为空,则返回null | | 超时退出组 | offer(e,time,unit) | poll(time,unit) | | 当阻塞队列满时,队里会阻塞生产者线程一定时间,超过限时后生产者线程会退出。
    使用offer插入的时候,可以指定时间,如果2秒还没有插入,那么就放弃插入,同理poll方法 | | 阻塞唤醒组
    | put(e) | take() | / | 当阻塞队列满时,生产者继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出
    当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用 | | 抛出异常组 | add(e) | remove() | element() | 当阻塞队列满时:在往队列中add插入元素会抛出 IIIegalStateException:Queue full
    当阻塞队列空时:再往队列中remove移除元素,会抛出NoSuchException
    element():Retrieves, but does not remove, the head of this queue,如果队列为空,则会抛异常:NoSuchElementException |

offer和poll方法的使用案例,通用该案例我们可以显而易见的发现,在使用阻塞队列后,在多线程通信里面,阻塞队列封装好了一套解决方案。

  1. package com.fly.ecoco10;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.TimeUnit;
  4. public class BlockingQueueDemo3 {
  5. static ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
  6. public static void main(String[] args) {
  7. new Thread() {
  8. @Override
  9. public void run() {
  10. super.run();
  11. try {
  12. System.out.println("进入:" + System.currentTimeMillis());
  13. //1秒=1000毫秒(MILLISECONDS);1毫秒=1000微秒(MICROSECONDS);1微秒=1000纳秒(NANOSECONDS)
  14. //System.currentTimeMillis()统计的是milliseconds,即毫秒。
  15. Object object = arrayBlockingQueue.poll(2, TimeUnit.SECONDS);//等待2秒,如果还没有元素则自动解除阻塞
  16. System.out.println("进入:" + System.currentTimeMillis());
  17. System.out.println("队列头节点:" + object);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }.start();
  23. }
  24. }
  25. //效果
  26. 进入:1644479566544
  27. 结束:1644479568544
  28. 队列头节点:null
  29. //我们针对上面的方法做下改造:加一个子线程往队列里面添加数据
  30. public class BlockingQueueDemo3 {
  31. static ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
  32. public static void main(String[] args) {
  33. new Thread() {
  34. @Override
  35. public void run() {
  36. super.run();
  37. try {
  38. System.out.println(System.currentTimeMillis());
  39. //1秒=1000毫秒(MILLISECONDS);1毫秒=1000微秒(MICROSECONDS);1微秒=1000纳秒(NANOSECONDS)
  40. //System.currentTimeMillis()统计的是milliseconds,即毫秒。
  41. //等待200毫秒,如果还没有元素则
  42. Object object = arrayBlockingQueue.poll(2, TimeUnit.SECONDS);//间隔2秒钟
  43. System.out.println(System.currentTimeMillis());
  44. System.out.println("队列头节点:" + object);
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. }.start();
  50. //主线程休息10毫妙确保上面的线程先进入并阻塞
  51. try {
  52. TimeUnit.MILLISECONDS.sleep(10);
  53. } catch (InterruptedException e) {
  54. e.printStackTrace();
  55. }
  56. new Thread(() -> {
  57. try {
  58. //往队列里面添加数据
  59. arrayBlockingQueue.offer(1,2,TimeUnit.SECONDS);
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. }).start();
  64. }
  65. }
  66. //改造后的数据效果:已经拿到了其他线程插入队列的值(间隔在45毫秒)
  67. 进入:1644479786442
  68. 结束:1644479786487
  69. 队列头节点:1

SynchronousQueue类的使用

SynchronousQueue没有容量,与其他BlockingQueue不同,SynchronousQueue是一个不存储的BlockingQueue,每一个put操作必须等待一个take操作,否者不能继续添加元素。
下面我们测试SynchronousQueue添加元素的过程:一个生产者线程,一个消费者线程,我们从最后的运行结果可以看出,每次生产者线程向阻塞队列添加元素后就会阻塞等待消费者线程,消费着线程消费后就会处于挂起状态,等待生产者在生产数据,从而周而复始,形成 一存一取的状态。

  1. package com.fly.ecoco10;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.SynchronousQueue;
  4. import java.util.concurrent.TimeUnit;
  5. public class SynchronousQueueDemo {
  6. static BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
  7. public static void main(String[] args) {
  8. //生产的线程分别put了A、B、C这三个字段
  9. new Thread(() -> {
  10. try {
  11. System.out.println(Thread.currentThread().getName() + " put A 生产时间:" + System.currentTimeMillis());
  12. blockingQueue.put("A");
  13. System.out.println(Thread.currentThread().getName() + " put B 生产时间:" + System.currentTimeMillis());
  14. blockingQueue.put("B");
  15. System.out.println(Thread.currentThread().getName() + " put C 生产时间:" + System.currentTimeMillis());
  16. blockingQueue.put("C");
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }, "prodThread").start();
  21. //消费线程使用take,消费阻塞队列中的内容,并且每次消费前,都等待5秒
  22. new Thread(() -> {
  23. try {
  24. TimeUnit.MILLISECONDS.sleep(10);
  25. blockingQueue.take();
  26. System.out.println(Thread.currentThread().getName() + " take A 消费时间:" + System.currentTimeMillis());
  27. System.out.println("");
  28. TimeUnit.MILLISECONDS.sleep(10);
  29. blockingQueue.take();
  30. System.out.println(Thread.currentThread().getName() + " take B 消费时间:" + System.currentTimeMillis());
  31. System.out.println("");
  32. TimeUnit.MILLISECONDS.sleep(10);
  33. blockingQueue.take();
  34. System.out.println(Thread.currentThread().getName() + " take B 消费时间:" + System.currentTimeMillis());
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. }, "conThread").start();
  39. }
  40. }
  41. //运行结果
  42. prodThread put A 生产时间:1644484873150
  43. conThread take A 消费时间:1644484873160
  44. prodThread put B 生产时间:1644484873160
  45. conThread take B 消费时间:1644484873170
  46. prodThread put C 生产时间:1644484873170
  47. conThread take B 消费时间:1644484873180
  48. //我们从最后的运行结果可以看出,每次生产者线程向阻塞队列添加元素后就会阻塞等待消费者线程,消费着线程消费后就会处于挂起状态,等待生产者在生产数据。
  49. //从而周而复始,形成 一存一取的状态。

应用:生产者消费者

详见生产者-消费者模式

4 CompletableFuture异步编排

概述

Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

很多语言,比如Node.js,采用回调的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future接口,提供了addListener等多个扩展方法;Google guava也提供了通用的扩展Future;Scala也提供了简单易用且功能强大的Future/Promise异步编程模式。
作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?
在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

核心概述CompleteableFuture(参考文档:线程池加异步编排

  1. 需求:主线程需要拿到子线程的执行结果(回调)、多个线程间的串行和并行。
  2. 1、使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
  3. 2、从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
  4. 3CompletableFutureFutureTask同属于Future接口的实现类,都可以获取线程的执行结果。
  5. 4、多个线程可以组合可以串行、并行调用。

使用:创建异步对象(runAsyn/supplyAsyn)

CompletableFuture 提供了四个静态方法来创建一个异步操作。

  1. /*
  2. 无Executor:没有指定Executor,则会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
  3. 有Executor:则使用指定的线程池运行。
  4. runAsync:方法不支持返回值。
  5. supplyAsync:方法支持返回值。
  6. */
  7. public static CompletableFuture<Void> runAsync(Runnable runnable)
  8. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  9. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  10. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

计算完成时的回调方法

包括:whenComplete/exceptionally/handle

whenComplete/exceptionally

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。注:方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。
主要是下面的方法:

  1. /*
  2. whenComplete可以处理正常和异常的计算结果,
  3. whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务,串行执行
  4. whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行,异步执行
  5. BiConsumer<? super T,? super Throwable>可以定义处理业务
  6. */
  7. public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
  8. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,?
  9. super Throwable> action);
  10. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,?
  11. super Throwable> action,
  12. Executor executor);
  13. //exceptionally处理异常情况
  14. public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);

案例

  1. @Autowired
  2. private ThreadPoolExecutor executor;
  3. @RequestMapping("/update")
  4. public void update() throws Throwable {
  5. CompletableFuture future = CompletableFuture.supplyAsync((Supplier<Object>) () -> {
  6. //打印当前的线程名
  7. System.out.println(Thread.currentThread().getName());
  8. try {
  9. //线程先休眠3秒中
  10. Thread.sleep(3000);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. //supplyAsync可以有返回值
  15. return 1024;
  16. }, executor).whenCompleteAsync((o, throwable) -> {
  17. //拿到上面线程执行的返回值
  18. System.out.println("-------o=" + o.toString());
  19. //打印输出线程名(如果使用的是Async则线程名可能不一致)
  20. System.out.println(Thread.currentThread().getName());
  21. //System.out.println("-------throwable=" + throwable);
  22. }, executor).exceptionally(throwable -> {
  23. //exceptionally:捕获异常
  24. System.out.println("throwable=" + throwable);
  25. return 6666;
  26. });
  27. //非阻塞方法
  28. System.out.println("Main");
  29. //阻塞方法
  30. System.out.println(future.get());
  31. }

执行结果
image.png

handle

handle 是执行任务完成时对结果的处理。handle 是在任务完成后再执行,还可以处理异常的任务。

  1. public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
  2. public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
  3. public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,
  4. Executor executor);

案例

  1. @RequestMapping("/update2")
  2. public void update2() throws InterruptedException, ExecutionException {
  3. CompletableFuture future = CompletableFuture.supplyAsync((Supplier<Object>) () -> {
  4. System.out.println(Thread.currentThread().getName());
  5. try {
  6. Thread.sleep(3000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. return 1024;
  11. }, executor).handleAsync(new BiFunction<Object, Throwable, Object>() {
  12. @Override
  13. public Object apply(Object o, Throwable throwable) {
  14. //拿到返回值和异常
  15. System.out.println(Thread.currentThread().getName());
  16. return o;
  17. }
  18. @Override
  19. public <V> BiFunction<Object, Throwable, V> andThen(Function<? super Object, ? extends V> after) {
  20. System.out.println(Thread.currentThread().getName());
  21. return null;
  22. }
  23. },executor);
  24. }

handle 方法和whenComplete方法的区别

  1. 总体上,whenComplete方法和handle方法是类似的。
  2. 区别主要在于接收的参数:whenComplete接收的是BiConsumer,handler接收的是BiFunction
  3. 顾名思义,BiConsumer是直接消费的,而BiFunction是有返回值的,
  4. BiConsumer没有返回值,而BiFunction是有的;

线程串行化方法

包括:thenApply/thenAccept/thenRun。
注:带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

  1. /*
  2. thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
  3. */
  4. public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
  5. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
  6. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
  7. Executor executor)
  8. Function<? super T,? extends U>
  9. T:上一个任务返回结果的类型
  10. U:当前任务的返回值类型
  11. /*
  12. thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果
  13. */
  14. public CompletionStage<Void> thenAccept(Consumer<? super T> action);
  15. public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
  16. public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
  17. /*
  18. thenRun方法:只要上面的任务执行完成,就开始执行thenRun,
  19. 只是处理完任务后,执行thenRun的后续操作
  20. */
  21. public CompletionStage<Void> thenRun(Runnable action);
  22. public CompletionStage<Void> thenRunAsync(Runnable action);
  23. public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

代码演示

  1. @RequestMapping("/update3")
  2. public void update3() throws ExecutionException, InterruptedException {
  3. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
  4. @Override
  5. public Integer get() {
  6. System.out.println(Thread.currentThread().getName() + "\t completableFuture");
  7. //int i = 10 / 0;
  8. return 1024;
  9. }
  10. }).thenApply(new Function<Integer, Integer>() {
  11. @Override
  12. public Integer apply(Integer o) {
  13. System.out.println("thenApply方法,上次返回结果:" + o);
  14. return o * 2;
  15. }
  16. }).whenComplete(new BiConsumer<Integer, Throwable>() {
  17. @Override
  18. public void accept(Integer o, Throwable throwable) {
  19. System.out.println("-------o=" + o);
  20. System.out.println("-------throwable=" + throwable);
  21. }
  22. }).exceptionally(new Function<Throwable, Integer>() {
  23. @Override
  24. public Integer apply(Throwable throwable) {
  25. System.out.println("throwable=" + throwable);
  26. return 6666;
  27. }
  28. }).handle(new BiFunction<Integer, Throwable, Integer>() {
  29. @Override
  30. public Integer apply(Integer integer, Throwable throwable) {
  31. System.out.println("handle o=" + integer);
  32. System.out.println("handle throwable=" + throwable);
  33. return 8888;
  34. }
  35. });
  36. System.out.println(future.get());
  37. }

线程并行化方法

包括:both/either/allof/anyof

并行两任务必须都完成(both/combine)

两个任务必须都完成,触发该任务。

  1. thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
  2. thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
  3. runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。

案例

  1. @RequestMapping("/update4")
  2. public void update4() {
  3. CompletableFuture.supplyAsync(() -> {
  4. try {
  5. System.out.println(Thread.currentThread().getId() +"&&"+ System.currentTimeMillis());
  6. Thread.sleep(5000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. return "hello";
  11. }, executor).thenCombine(CompletableFuture.supplyAsync(() -> {
  12. try {
  13. System.out.println(Thread.currentThread().getId() +"&&"+ System.currentTimeMillis());
  14. Thread.sleep(5000);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. return "world";
  19. }, executor), (t, u) -> {
  20. return t + u;
  21. }).whenComplete((t, u) -> {
  22. System.out.println(t);
  23. System.out.println(Thread.currentThread().getId() +"&&"+ System.currentTimeMillis());
  24. });
  25. }
  26. //注意:thenCombine是并行运行,其结果可知,supplyAsync和thenCombine的参数1是同时开始的,
  27. whenComplete是会等前面的CompletableFuture运行完成后再运行
  28. 98&&1629795033202
  29. 99&&1629795033202
  30. helloworld
  31. 98&&1629795038202

打印结果:hello world! CompletableFuture

并行两任务其中一个完成即可(either)

当两个任务中,任意一个future任务完成的时候,执行任务

  1. applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
  2. acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
  3. runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。

案例

  1. @RequestMapping("/update3")
  2. public void update3() throws ExecutionException, InterruptedException {
  3. CompletableFuture.supplyAsync(() -> {
  4. try {
  5. Thread.sleep(20000);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. return "hello";
  10. }).applyToEither(CompletableFuture.completedFuture("World"), (t) -> {
  11. return t;
  12. }).whenComplete((t, u) -> {
  13. System.out.println(t);
  14. });
  15. }
  16. //两个任务谁先处理完谁先执行either方法,最后输出结果
  17. World

并行多任务全部或其一

包括方法:allOf/anyOf

  1. //allOf:等待所有任务完成
  2. public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
  3. //anyOf:只要有一个任务完成
  4. public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

案例

  1. @RequestMapping("/update3")
  2. public void update3() throws ExecutionException, InterruptedException {
  3. //1.0 待执行的任务列表
  4. CompletableFuture<String> stringCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
  5. try {
  6. System.out.println(Thread.currentThread().getId() + "&&" + System.currentTimeMillis());
  7. Thread.sleep(5000);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. return "hello";
  12. });
  13. CompletableFuture<String> stringCompletableFuture2 = CompletableFuture.supplyAsync(() -> {
  14. try {
  15. System.out.println(Thread.currentThread().getId() + "&&" + System.currentTimeMillis());
  16. Thread.sleep(5000);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. return "world";
  21. });
  22. CompletableFuture<String> stringCompletableFuture3 = CompletableFuture.supplyAsync(() -> {
  23. try {
  24. System.out.println(Thread.currentThread().getId() + "&&" + System.currentTimeMillis());
  25. Thread.sleep(5000);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. return "nice";
  30. });
  31. List<CompletableFuture<String>> completableFutures = Arrays.asList(stringCompletableFuture1, stringCompletableFuture2, stringCompletableFuture3);
  32. final CompletableFuture<Void> allCompleted = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[]{}));
  33. //轮询得到返回值
  34. allCompleted.thenRun(() -> {
  35. completableFutures.stream().forEach(future -> {
  36. try {
  37. System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get());
  38. } catch (InterruptedException | ExecutionException e) {
  39. e.printStackTrace();
  40. }
  41. });
  42. });
  43. }
  44. 结果:
  45. 54&&1629796351570
  46. 48&&1629796351570
  47. 55&&1629796351570
  48. get future at:1629796356570, result:hello
  49. get future at:1629796356570, result:world
  50. get future at:1629796356570, result:nice

一些注意点

  1. 在使用异步编排CompletableFuture对象设置值要注意数据是否是在ThreadLocal里,如果是在主线程的ThreadLocal,则不能直接使用,要先读值,再赋值。
    1. //1.0 主线程
    2. RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
    3. CompletableFuture<Void> getAddressFuture = CompletableFuture.runAsync(() -> {
    4. //2.0 子线程:把旧RequestAttributes放到新线程的RequestContextHolder中,该RequestContextHolder对象数据即在当前Thread下
    5. RequestContextHolder.setRequestAttributes(attributes);
    6. // 远程查询所有的收获地址列表
    7. List<MemberAddressVo> address;
    8. }, threadPoolExecutor);

    整合案例与总结

    串行化案例

需求:三个CompletableFuture,第一个CompletableFuture根据手机号查询Id,第二个CompletableFuture根据Id查询校区,第三个CompletableFuture根据校区查询该校区排名前10报名的学生。串行化代码如下:

  1. @RequestMapping("/update5")
  2. public void update5() {
  3. //1.0 第一个任务
  4. CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
  5. @Override
  6. public Integer get() {
  7. return search1();
  8. }
  9. }, executor);
  10. //2.0 第二个任务//不能这样定义,因为如果这样的话future2会自动创建并执行
  11. //CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(new Supplier<Object>() {
  12. // @Override
  13. // public Object get() {
  14. // return search2();
  15. // }
  16. //}, executor);
  17. //2.0 第二个任务
  18. String returnValue = future1.thenApplyAsync(new Function<Integer, String>() {
  19. @Override
  20. public String apply(Integer userId) {
  21. return search2(userId);
  22. }
  23. }, executor).exceptionally(new Function<Throwable, String>() {
  24. @Override
  25. public String apply(Throwable throwable) {
  26. return null;
  27. }
  28. }).get();
  29. System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis()+"&&"+returnValue);
  30. }
  31. private Integer search1() {
  32. try {
  33. System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis());
  34. //查询时间默认5秒钟
  35. Thread.sleep(5000);
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. } finally {
  39. return 1;
  40. }
  41. }
  42. private String search2(Integer userId) {
  43. try {
  44. System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis());
  45. //查询时间默认5秒钟
  46. Thread.sleep(5000);
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. } finally {
  50. return "gz";
  51. }
  52. }
  53. 结果:
  54. pool-2-thread-1&&1629799019179
  55. //子线程:时隔5秒拿到上一个线程的执行返回值
  56. pool-2-thread-2&&1629799024180
  57. //主线程:时隔5秒后拿到上一个线程的执行返回值
  58. http-nio-8003-exec-1&&1629799029180&&gz
  59. 需要注意的一点:要得到子线程的返回值,还是得用get方法。

串行+并行化案例

先串行去查询商品的,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作。

  1. @RequestMapping("/update5")
  2. public Student update5() throws ExecutionException, InterruptedException {
  3. Student student = new Student();
  4. //1.0 第一个任务(第一个任务先执行,内部需要执行5分钟)
  5. CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
  6. @Override
  7. public Integer get() {
  8. Integer integer = search1();
  9. student.setUserId(integer);
  10. return integer;
  11. }
  12. }, executor);
  13. //2.0 第二个任务(第二个任务和第一个任务可以并行执行,内部也需要执行5分钟)
  14. CompletableFuture<Void> future2 = CompletableFuture.runAsync(new Runnable() {
  15. @Override
  16. public void run() {
  17. String areaSchool = search2();
  18. student.setAreaSchool(areaSchool);
  19. }
  20. }, executor);
  21. //3.0 第三个任务(需要依靠第一个任务返回的数据,其内部执行也需要5秒)
  22. CompletableFuture<Void> future3 = future1.thenAcceptAsync(new Consumer<Integer>() {
  23. @Override
  24. public void accept(Integer integer) {
  25. try {
  26. Thread.sleep(5000);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis());
  31. integer = integer + 10;
  32. student.setAge(integer);
  33. }
  34. });
  35. //4.0 使用allOf达到必须所有任务执行完的效果
  36. CompletableFuture<Void> futureAll = CompletableFuture.allOf(future2, future3);
  37. System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis() + "&&Student" + student);
  38. //阻塞主进程,等待子进程全部执行完毕!
  39. futureAll.get();
  40. System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis() + "&&Student" + student);
  41. return student;
  42. }
  43. //搜索子方法
  44. private Integer search1() {
  45. try {
  46. System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis());
  47. //查询时间默认5秒钟
  48. Thread.sleep(5000);
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. } finally {
  52. return 1;
  53. }
  54. }
  55. private String search2() {
  56. try {
  57. System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis());
  58. //查询时间默认5秒钟
  59. Thread.sleep(5000);
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. } finally {
  63. return "gz";
  64. }
  65. }
  66. 结果(为了对比直观,这里把currentTimeMillis的前几位都截掉了)
  67. pool-2-thread-1&&385015
  68. pool-2-thread-2&&385015
  69. http-nio-8003-exec-1&&385016&&StudentStudent(userId=null, areaSchool=null, age=null)
  70. ForkJoinPool.commonPool-worker-4&&395015
  71. http-nio-8003-exec-1&&395015&&StudentStudent(userId=1, areaSchool=gz, age=11)

总结

CompletableFuture的优点

  1. 异步任务结束时,会自动回调某个对象的方法(whenComplete/handle)
  2. 异步任务出错时,会自动回调某个对象的方法(exceptionally)
  3. 主线程设置好回调后,不再关心异步任务的执行
  4. 多个CompletableFuture可以串行执行(thenApply/thenAccept/thenRun)
  5. 多个CompletableFuture可以并行执行(both/either/allof/anyof)