参考: https://xie.infoq.cn/article/deba3305beba3838c2cf6c102
https://blog.csdn.net/ffj0721/article/details/100262873
https://javakk.com/225.html
https://blog.51cto.com/u_15127620/2758372
https://blog.csdn.net/aei600024/article/details/102002813
https://zhuanlan.zhihu.com/p/422946792
https://www.jianshu.com/p/d7fa576e50da
https://zhuanlan.zhihu.com/p/446797833
https://blog.csdn.net/hellozhxy/article/details/119239776
同步与异步
- 同步方式代码简单直观, 但是由于阻塞操作, 降低了系统资源的利用率
-
Future
Future接口是Java对异步编程的第一个解决方案, 它代表着异步计算的结果
- Future接口提供了等待计算完成获取结果, 尝试取消任务等功能
Future接口存在的缺陷
- 由于缺少回调机制, 无法得知任务何时完成
- 无法方便获取任务结果, 在主线程直接获取任务结果是阻塞操作, 轮询又会浪费CPU资源
- 复杂场景下无法实现异步线程编排, 比如异步任务结果之间存在依赖, 需要合并等
Callback
为解决Future存在的问题, 人们提出Callback的解决方案, Callback 是将任务执行结果作为接口的入参,在任务完成时回调 Callback 接口,执行后续任务,从而解决纯 Future 方案无法方便获得任务执行结果的问题
如Google Guava 包中的 ListenableFuture
但 Callback 产生了新的问题:
那就是代码可读性的问题。因为使用 Callback 之后,代码的字面形式和其所表达的业务含义不匹配,即业务的先后关系到了代码层面变成了包含和被包含的关系, 因此,如果大量使用 Callback 机制,将使大量的应该是先后的业务逻辑在代码形式上表现为层层嵌套。这会导致代码难以理解和维护。这便是所谓的 Callback Hell(回调地狱)
CompletableFuture
CompletableFuture是JDK8对Future接口的增强
- 提供了函数式编程写法, 通过回调获取异步结果, 语义清晰, 代码简练, 使开发者专注于业务逻辑
- 默认使用forkJoinPool线程池, 无需手工维护线程
- 结合completionStage接口, 提供了异步线程编排的能力, 能满足大多数业务场景
响应式编程
编程范式
- 命令式编程, 关注计算机的执行步骤, 即一步步告诉计算机先做什么再做什么, 常见的编程方式都为命令式编程
- 声明式编程, 以数据结构的形式来表达程序执行的逻辑, 告诉计算机应该做什么, 但不指定具体要怎么做
如SQL语句, HTML, CSS等, 特点是不需要创建变量来存储数据, 以及不包含循环控制的代码
- 函数式编程, 同声明式编程的思想, 只关注做什么, 而不关注怎么做, 特点是函数为第一位, 函数可以作为参数传递, 以及作为返回值, 如Java中的lambda以及闭包的概念
- 响应式编程, 也称反应式编程, 是一种面向数据流和变化传播的编程范式, 使用异步数据流进行编程, 关注数据的流转过程而不是逻辑控制, 如Reactive Stream规范, 以及实现了规范的Akka Stream, Project Reactor, Vert.x, RxJava等框架
响应式编程模型的运行方式
- 首先是Subscriber(订阅者)主动订阅 Publisher(发布者),通过调用 Publisher 的 subscribe 方法
- Publisher 在向下游发送数据之前,会先调用 Subscriber 的 onSubscribe 方法,传递的参数为 Subscription(订阅媒介)
- Subscriber 通过 Subscription#request 来请求数据,或者 Subscription#cancel 来取消数据发布(这就是响应式编程中的背压,订阅者可以控制数据发布)
- Subscription 在接收到订阅者的调用后,通过 Subscriber#onNext 向下游订阅者传递数据。
- 在数据发布完成后,调用 Subscriber#onComplete 结束本次流,如果数据发布或者处理遇到错误会调用 Subscriber#onError
Java 8, RxJava, Reactor对比
参考: https://blog.51cto.com/u_15127620/2758372
ProjectReactor
Reactor 是 JVM 的非阻塞响应式编程基础,支持背压。 它直接与 Java 8 函数式 API 集成,特别是 CompletableFuture、Stream 和 Duration。 它提供了可组合的异步序列 API — Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并实现了 Reactive Streams 规范。
相比CompletableFuture, Reactor库有以下优势:
- 支持背压
- 可组合性和可读性, 提供了更为强大的异步任务编排能力, 使代码更简洁, 也避免了回调地狱, 如下官方示例
但同样也存在问题:
- 响应式编程是针对数据流的编程方式, 问题调试debug会更加困难
- 响应式编程的上下文传递会更加困难, 甚至失效, 比如LogBack的MDC机制, reactor3.1提供了context, 但需要手动绑定到序列中, 使链中的每个操作员可以访问它
其他
协程
概念
- 协程是一个特殊的函数, 可以在某个地方挂起, 并且可以重新在挂起处继续运行
- 一个进程包含多个线程, 一个线程可以包含多个协程
- 一个线程中的多个协程可以切换, 但运行是绝对串行的
- 线程由内核调度, 而协程的调度由进程自身完成, 这样就可以不受操作系统对线程数量的限制, 一个线程内部可以创建成千上万个协程
使用场景
- 一个线程内多个协程串行执行, 因此无法利用CPU多核能力, 因此协程不适合计算密集型场景, 而适合IO阻塞型
- 当一个线程上的协程发生IO阻塞时, 可以立即切换到其他协程执行, 以提高效率
- 当IO密集型时, 由于不能利用多核, 因此需要”多进程+协程”处理
相关技术实现:
- Quasar、Kilim, 开源的Java轻量级线程(协程)框架,通过利用Java instrument技术对字节码进行修改,使方法挂起前后可以保存和恢复JVM栈帧,方法内部已执行到的字节码位置也通过增加状态机的方式记录,在下次恢复执行可直接跳转至最新位置
- Kotlin Coroutine 协程库,因为 Kotlin 的运行依赖于 JVM,不能对 JVM 进行修改,因此Kotlin不能在底层支持协程。同时Kotlin 是一门编程语言,需要在语言层面支持协程,所以Kotlin 对协程支持最核心的部分是在编译器中完成,这一点其实和Quasar、Kilim实现原理类似,都是在编译期通过修改字节码的方式实现协程
示例:
查询最近邮件数(反应式编程版)
@GetMapping("/reactive/{personId}")
fun getMessagesFor(@PathVariable personId: String): Mono<String> {
return peopleRepository.findById(personId)
.switchIfEmpty(Mono.error(NoSuchElementException()))
.flatMap { person ->
auditRepository.findByEmail(person.email)
.flatMap { lastLogin ->
messageRepository.countByMessageDateGreaterThanAndEmail(lastLogin.eventDate, person.email)
.map { numberOfMessages ->
"Hello ${person.name}, you have $numberOfMessages messages since ${lastLogin.eventDate}"
}
}
}
}
查询最近邮件数(Kotlin协程版)
@GetMapping("/coroutine/{personId}")
fun getNumberOfMessages(@PathVariable personId: String) = mono(Unconfined) {
val person = peopleRepository.findById(personId).awaitFirstOrDefault(null)
?: throw NoSuchElementException("No person can be found by $personId")
val lastLoginDate = auditRepository.findByEmail(person.email).awaitSingle().eventDate
val numberOfMessages =
messageRepository.countByMessageDateGreaterThanAndEmail(lastLoginDate, person.email).awaitSingle()
"Hello ${person.name}, you have $numberOfMessages messages since $lastLoginDate"
}
纤程
Project Loom引入了纤程Fiber的概念, Fiber与Thread类一起为Strand的子类, Fiber提供了比传统Thread更轻量级的虚拟的线程的概念, 破除了Java的线程限制(Java的线程与操作系统线程一一对应), 提高了并发能力; 不同于之前的方案,Project Loom 是从 JVM 层面对多线程技术进行彻底的改变
使用示例
Fiber f = Fiber.schedule(() -> {
println("Hello 1");
lock.lock(); // 等待锁不会挂起线程
try {
println("Hello 2");
} finally {
lock.unlock();
}
println("Hello 3");
})