参考: https://xie.infoq.cn/article/deba3305beba3838c2cf6c102
https://blog.csdn.net/ffj0721/article/details/100262873
https://javakk.com/225.html
https://blog.51cto.com/u_15127620/2758372
https://blog.csdn.net/aei600024/article/details/102002813
https://zhuanlan.zhihu.com/p/422946792
https://www.jianshu.com/p/d7fa576e50da
https://zhuanlan.zhihu.com/p/446797833
https://blog.csdn.net/hellozhxy/article/details/119239776

同步与异步

  • 同步方式代码简单直观, 但是由于阻塞操作, 降低了系统资源的利用率
  • 异步方式为非阻塞, 有利于提高系统弹性, 容错性等

    Future

  • Future接口是Java对异步编程的第一个解决方案, 它代表着异步计算的结果

  • Future接口提供了等待计算完成获取结果, 尝试取消任务等功能

image.png

Future接口存在的缺陷

  • 由于缺少回调机制, 无法得知任务何时完成
  • 无法方便获取任务结果, 在主线程直接获取任务结果是阻塞操作, 轮询又会浪费CPU资源
  • 复杂场景下无法实现异步线程编排, 比如异步任务结果之间存在依赖, 需要合并等

Callback

为解决Future存在的问题, 人们提出Callback的解决方案, Callback 是将任务执行结果作为接口的入参,在任务完成时回调 Callback 接口,执行后续任务,从而解决纯 Future 方案无法方便获得任务执行结果的问题

如Google Guava 包中的 ListenableFuture
image.png

但 Callback 产生了新的问题:
那就是代码可读性的问题。因为使用 Callback 之后,代码的字面形式和其所表达的业务含义不匹配,即业务的先后关系到了代码层面变成了包含和被包含的关系, 因此,如果大量使用 Callback 机制,将使大量的应该是先后的业务逻辑在代码形式上表现为层层嵌套。这会导致代码难以理解和维护。这便是所谓的 Callback Hell(回调地狱)
Java异步编程技术 - 图3

CompletableFuture

CompletableFuture是JDK8对Future接口的增强

  • 提供了函数式编程写法, 通过回调获取异步结果, 语义清晰, 代码简练, 使开发者专注于业务逻辑
  • 默认使用forkJoinPool线程池, 无需手工维护线程
  • 结合completionStage接口, 提供了异步线程编排的能力, 能满足大多数业务场景

image.png

响应式编程

编程范式

  • 命令式编程, 关注计算机的执行步骤, 即一步步告诉计算机先做什么再做什么, 常见的编程方式都为命令式编程
  • 声明式编程, 以数据结构的形式来表达程序执行的逻辑, 告诉计算机应该做什么, 但不指定具体要怎么做

如SQL语句, HTML, CSS等, 特点是不需要创建变量来存储数据, 以及不包含循环控制的代码

  • 函数式编程, 同声明式编程的思想, 只关注做什么, 而不关注怎么做, 特点是函数为第一位, 函数可以作为参数传递, 以及作为返回值, 如Java中的lambda以及闭包的概念
  • 响应式编程, 也称反应式编程, 是一种面向数据流和变化传播的编程范式, 使用异步数据流进行编程, 关注数据的流转过程而不是逻辑控制, 如Reactive Stream规范, 以及实现了规范的Akka Stream, Project Reactor, Vert.x, RxJava等框架

响应式编程模型的运行方式

Java异步编程技术 - 图5

  1. 首先是Subscriber(订阅者)主动订阅 Publisher(发布者),通过调用 Publisher 的 subscribe 方法
  2. Publisher 在向下游发送数据之前,会先调用 Subscriber 的 onSubscribe 方法,传递的参数为 Subscription(订阅媒介)
  3. Subscriber 通过 Subscription#request 来请求数据,或者 Subscription#cancel 来取消数据发布(这就是响应式编程中的背压,订阅者可以控制数据发布)
  4. Subscription 在接收到订阅者的调用后,通过 Subscriber#onNext 向下游订阅者传递数据。
  5. 在数据发布完成后,调用 Subscriber#onComplete 结束本次流,如果数据发布或者处理遇到错误会调用 Subscriber#onError

Java 8, RxJava, Reactor对比

参考: https://blog.51cto.com/u_15127620/2758372
Java异步编程技术 - 图6

ProjectReactor

Reactor 是 JVM 的非阻塞响应式编程基础,支持背压。 它直接与 Java 8 函数式 API 集成,特别是 CompletableFuture、Stream 和 Duration。 它提供了可组合的异步序列 API — Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并实现了 Reactive Streams 规范。

相比CompletableFuture, Reactor库有以下优势:

  • 支持背压
  • 可组合性和可读性, 提供了更为强大的异步任务编排能力, 使代码更简洁, 也避免了回调地狱, 如下官方示例

image.png
image.png

但同样也存在问题:

  • 响应式编程是针对数据流的编程方式, 问题调试debug会更加困难
  • 响应式编程的上下文传递会更加困难, 甚至失效, 比如LogBack的MDC机制, reactor3.1提供了context, 但需要手动绑定到序列中, 使链中的每个操作员可以访问它

其他

协程

概念

  • 协程是一个特殊的函数, 可以在某个地方挂起, 并且可以重新在挂起处继续运行
  • 一个进程包含多个线程, 一个线程可以包含多个协程
  • 一个线程中的多个协程可以切换, 但运行是绝对串行的
  • 线程由内核调度, 而协程的调度由进程自身完成, 这样就可以不受操作系统对线程数量的限制, 一个线程内部可以创建成千上万个协程

使用场景

  • 一个线程内多个协程串行执行, 因此无法利用CPU多核能力, 因此协程不适合计算密集型场景, 而适合IO阻塞型
  • 当一个线程上的协程发生IO阻塞时, 可以立即切换到其他协程执行, 以提高效率
  • 当IO密集型时, 由于不能利用多核, 因此需要”多进程+协程”处理

相关技术实现:

  • Quasar、Kilim, 开源的Java轻量级线程(协程)框架,通过利用Java instrument技术对字节码进行修改,使方法挂起前后可以保存和恢复JVM栈帧,方法内部已执行到的字节码位置也通过增加状态机的方式记录,在下次恢复执行可直接跳转至最新位置
  • Kotlin Coroutine 协程库,因为 Kotlin 的运行依赖于 JVM,不能对 JVM 进行修改,因此Kotlin不能在底层支持协程。同时Kotlin 是一门编程语言,需要在语言层面支持协程,所以Kotlin 对协程支持最核心的部分是在编译器中完成,这一点其实和Quasar、Kilim实现原理类似,都是在编译期通过修改字节码的方式实现协程

示例:
查询最近邮件数(反应式编程版)

  1. @GetMapping("/reactive/{personId}")
  2. fun getMessagesFor(@PathVariable personId: String): Mono<String> {
  3. return peopleRepository.findById(personId)
  4. .switchIfEmpty(Mono.error(NoSuchElementException()))
  5. .flatMap { person ->
  6. auditRepository.findByEmail(person.email)
  7. .flatMap { lastLogin ->
  8. messageRepository.countByMessageDateGreaterThanAndEmail(lastLogin.eventDate, person.email)
  9. .map { numberOfMessages ->
  10. "Hello ${person.name}, you have $numberOfMessages messages since ${lastLogin.eventDate}"
  11. }
  12. }
  13. }
  14. }

查询最近邮件数(Kotlin协程版)

  1. @GetMapping("/coroutine/{personId}")
  2. fun getNumberOfMessages(@PathVariable personId: String) = mono(Unconfined) {
  3. val person = peopleRepository.findById(personId).awaitFirstOrDefault(null)
  4. ?: throw NoSuchElementException("No person can be found by $personId")
  5. val lastLoginDate = auditRepository.findByEmail(person.email).awaitSingle().eventDate
  6. val numberOfMessages =
  7. messageRepository.countByMessageDateGreaterThanAndEmail(lastLoginDate, person.email).awaitSingle()
  8. "Hello ${person.name}, you have $numberOfMessages messages since $lastLoginDate"
  9. }

纤程

Project Loom引入了纤程Fiber的概念, Fiber与Thread类一起为Strand的子类, Fiber提供了比传统Thread更轻量级的虚拟的线程的概念, 破除了Java的线程限制(Java的线程与操作系统线程一一对应), 提高了并发能力; 不同于之前的方案,Project Loom 是从 JVM 层面对多线程技术进行彻底的改变

使用示例

  1. Fiber f = Fiber.schedule(() -> {
  2. println("Hello 1");
  3. lock.lock(); // 等待锁不会挂起线程
  4. try {
  5. println("Hello 2");
  6. } finally {
  7. lock.unlock();
  8. }
  9. println("Hello 3");
  10. })