11.1 使用 Spring WebFlux

典型的基于 Servlet 的 web 框架,比如 Spring MVC,本质上是阻塞和多线程的,每个连接使用一个线程。在处理请求时,将从线程池中提取一个工作线程来处理该请求。同时,请求线程被阻塞,直到工作线程通知它已完成为止。

因此,在请求量很大的情况下,阻塞 web 框架不能有效地扩展。慢工作线程中的延迟使情况更糟,因为工作线程池准备处理另一个请求所需的时间更长。在某些用例中,这种工作方式是完全可以接受的。事实上,这在很大程度上是大多数 web 应用程序十多年来的开发方式,但时代在变。

这些 web 应用程序伴随着 HTTP API,已经从人们偶尔浏览网站成长为人们经常消费内容和使用应用程序。现在,所谓的 物联网(其中甚至没有人参与)产生的汽车、喷气发动机以及其他非传统的客户不断地通过 web API 交换数据。随着越来越多的客户使用 web 应用程序,扩展性比以往任何时候都更加重要。

相比之下,异步 web 框架实现用较少的线程达到更高的可扩展性,通常一个 CPU 一个线程。通过应用被称为 event looping 的技术(如图 11.1 所示),这些框架的每个线程都能够处理许多请求,使得每个连接的成本低 。

图 11.1 异步 web 框架通过应用 event looping,使用较少的线程处理更多的请求

11.1

在一个 event loop 中,一切皆为事件,其中包括像是数据库和网络操作这种密集操作的请求与回调。当需要完成一个重要的操作时,event loop 并行地为那个操作注册一个回调,然后它继续去处理其他事件。

当操作完成后,它会被 event loop 视为一个 event,对于请求也是一样的操作。这样异步 web 框架就能够使用更少的线程应对繁重的请求,从而实现更好的扩展性,这样做的结果就是降低了线程管理的开销。

Spring 5 已经基于 Project Reactor 推出了一个非阻塞异步 web 框架,以解决在 web 应用程序和 API 更大的可扩展性。让我们来看看 Spring WebFlux —— 一个响应式 web 框架。

11.1.1 Spring WebFlux 介绍

当 Spring 团队正在考虑如何添加一个响应式编程模型的网络层,很快就发现,如果不在 Spring MVC 做很大的改动,很明显这样做是很困难的。这将涉及到分支代码来决定是否响应式地处理请求。在本质上,其结果将是把两个 web 框架打包成一个,用 if 语句来分离响应式与非响应式。

最终决定创建一个单独的响应式 web 框架,这个框架尽可能的借鉴 Spring MVC,而不是强行把响应式编程模型塞进 Spring MVC 中。Spring WebFlux 就是这个框架了。图 11.2 展示了由 Spring 5 所定义的完整的 web 开发技术栈。

图 11.2 Spring 5 通过名为 WebFlux 的新 web 框架支持响应式式 web 应用程序,WebFlux 是 Spring MVC 的兄弟,它们共享许多核心组件

11.2

在图 11.2 的左侧,可以看到 SpringMVC 技术栈,它是在 Spring 框架的 2.5 版中引入的。SpringMVC(在第 2 章和第 6 章中介绍)位于 Java Servlet API 之上,它需要一个 Servlet 容器(比如 Tomcat)来执行。

相比之下,Spring WebFlux(在右侧)与 Servlet API 没有关系,因此它构建在一个响应式 HTTP API 之上,这个方式与使用 Servlet API 提供的相同的响应式功能类似。而且由于 Spring WebFlux 没有耦合到 Servlet API,因此它不需要运行一个 Servlet 容器。相反,它可以在任何非阻塞 web 容器上运行,包括 Netty、Undertow、Tomcat、Jetty 或任何 Servlet3.1 或更高版本的容器。

图 11.2 最值得注意的是左上角的框,它表示了 Spring MVC 和 Spring WebFlux 之间常见的组件,主要是用于定义 controller 的注解。由于 Spring MVC 和 Spring WebFlux 共享相同的注解,Spring WebFlux 在许多方面与 Spring MVC 没有区别。

右上角的框表示另一种编程模型,该模型使用函数式编程范式而不是使用注解来定义 controller。我们将在第 11.2 节中详细讨论 Spring 的函数式 web 编程模型。

Spring MVC 和 Spring WebFlux 之间最显著的区别就是添加到构建中的依赖项不同。在使用 Spring WebFlux 时,需要添加 Spring Boot WebFlux starter 依赖项,而不是标准的 web starter(例如,spring-boot-starter-web)。在项目的 pom.xml 文件中,如下所示:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-webflux</artifactId>
  4. </dependency>

注意:与大多数 Spring Boot 的 starter 依赖项一样,这个 starter 也可以通过选中 initializer 中的 Reactive Web 复选框添加到项目中。

使用 WebFlux 而不是 Spring MVC 的一个有趣的副作用是,WebFlux 的默认嵌入式服务器是 Netty 而不是 Tomcat。Netty 是少数几个异步的事件驱动的服务器之一,它自然适合像 Spring WebFlux 这样的响应式 web 框架。

除了使用不同的 starter 依赖项之外,Spring WebFlux controller 方法通常接受并返回响应式类型,比如 Mono 和 Flux,而不是域类型和集合。Spring WebFlux 控制器还可以处理 RxJava 类型,比如 Observable、Single 和 Completable。

响应式 Spring MVC?

尽管 Spring WebFlux controller 通常返回 Mono 和 Flux,但这并不意味着 Spring MVC 在处理响应式类型时没有办法。如果你愿意,Spring MVC controller 方法也可以返回 Mono 或 Flux。

不同之处在于如何使用这些类型。Spring WebFlux 是一个真正的响应式 web 框架,允许在 event loop 中处理请求,而 Spring MVC 是基于 Servlet 的,依赖多线程处理多个请求。

让我们通过重写一些 Taco Cloud 的 API controller 来让 Spring WebFlux 工作。

11.1.2 编写响应式 controller

你可能还记得,在第 6 章中,你为 Taco Cloud 的 REST API 创建了一些 controller。这些 controller 具有处理请求的方法,这些方法根据域类型(如 Order 和 Taco)或域类型的集合,处理输入和输出。提醒一下,请考虑你在第 6 章中写过的 DesignTacoController 中的以下片段:

  1. @RestController
  2. @RequestMapping(path="/design", produces="application/json")
  3. @CrossOrigin(origins="*")
  4. public class DesignTacoController {
  5. ...
  6. @GetMapping("/recent")
  7. public Iterable<Taco> recentTacos() {
  8. PageRequest page = PageRequest.of(
  9. 0, 12, Sort.by("createdAt").descending());
  10. return tacoRepo.findAll(page).getContent();
  11. }
  12. ...
  13. }

如前所述,recentTacos() controller 处理 /design/recent 的 HTTP GET 请求,以返回最近创建的 tacos 的列表。更具体地说,它返回一个 Iterable 类型的 Taco。这主要是因为这是从 respository 的 findAll() 方法返回的,或者更准确地说,是从 findAll() 返回的页面对象的 getContent() 方法返回的。

这很好,但是 Iterable 不是一个响应式的。你将不能对它应用任何响应式操作,也不能让框架利用它作为响应式类型在多个线程上分割任何工作。你想要的是 recentTacos() 返回一个 Flux

这里有一个简单但有点有限的选项,就是重写 recentTacos() 将 Iterable 转换为 Flux。而且,当你使用它时,可以去掉分页代码,并用调用 take() 来替换它:

  1. @GetMapping("/recent")
  2. public Flux<Taco> recentTacos() {
  3. return Flux.fromIterable(tacoRepo.findAll()).take(12);
  4. }

使用 Flux.fromIterable(),可以将 Iterable 转换为 Flux。现在你正在使用一个 Flux,可以使用take() 操作将返回的 Flux 限制为最多 12 个 Taco 对象。不仅代码简单,它还处理一个响应式 Flux,而不是一个简单的 Iterable。

迄今为止,编写响应式代码是一个成功的举措。但是,如果 repository 提供了一个可以开始使用的 Flux,那就更好了,这样就不需要进行转换。如果是这样的话,那么 recentTacos() 可以写成如下:

  1. @GetMapping("/recent")
  2. public Flux<Taco> recentTacos() {
  3. return tacoRepo.findAll().take(12);
  4. }

那就更好了!理想情况下,一个响应式 cotroller 将是一个端到端的响应式栈的顶端,包括 controller、repository、database 和任何可能位于两者之间的 serviec。这种端到端的响应式栈如图 11.3 所示:

图 11.3 为了最大限度地发挥响应式 web 框架的优势,它应该是完整的端到端响应式堆栈的一部分

11.3

这样的端到端的栈要求 repository 被写入以返回一个 Flux,而不是一个Iterable。在下一章中,我们将探讨如何编写响应式 repostitory,但下面我们将看一看响应式 TacoRepository 可能是什么样子:

  1. public interface TacoRepository extends ReactiveCrudRepository<Taco, Long> {
  2. }

然而,在这一点上,最重要的是,除了使用 Flux 而不是 Iterable 以及如何获得 Flux 外,定义响应式 WebFlux controller 的编程模型与非响应式 Spring MVC controller 没有什么不同。两者都用 @RestController 和类级别的 @RequestMapping 进行了注解。它们都有请求处理函数,在方法级别用 @GetMapping 进行注解。真正的问题是处理程序方法返回什么类型。

另一个要做的重要观察是,尽管从 repository 中获得了一个 Flux,但你可以在不调用 subscribe() 的情况下返回它。实际上,框架将为你调用 subscribe()。这意味着当处理对 /design/recent 的请求时,recentTacos() 方法将被调用,并在从数据库中获取数据之前返回!

返回单个值

作为另一个例子,请考虑 DesignTacoController 中的 tacoById() 方法,如第 6 章中所述:

  1. @GetMapping("/{id}")
  2. public Taco tacoById(@PathVariable("id") Long id) {
  3. Optional<Taco> optTaco = tacoRepo.findById(id);
  4. if (optTaco.isPresent()) {
  5. return optTaco.get();
  6. }
  7. return null;
  8. }

在这里,这个方法处理 /design/{id} 的 GET 请求并返回一个 Taco 对象。因为 repository 的 findById() 返回一个 Optional,所以还必须编写一些笨拙的代码来处理这个问题。但是假设 findById() 返回 Mono 而不是 Optional。在这种情况下,可以重写 controller 的 tacoById(),如下所示:

  1. @GetMapping("/{id}")
  2. public Mono<Taco> tacoById(@PathVariable("id") Long id) {
  3. return tacoRepo.findById(id);
  4. }

哇!这就简单多了。然而,更重要的是,通过返回 Mono 而不是 Taco,可以使 Spring WebFlux 以一种被动的方式处理响应。因此,你的API将更好地响应大的负载。

使用 RxJava 类型

值得指出的是,虽然在使用 Spring WebFlux 时,像 Flux 和 Mono 这样的 Reactor 类型是一个自然的选择,但是你也可以选择使用像 Observable 和 Single 这样的 RxJava 类型。例如,假设 DesignTacoController 和后端 repository 之间有一个 service,它处理 RxJava 类型。在这种情况下,recentTacos() 方法的编写方式如下:

  1. @GetMapping("/recent")
  2. public Observable<Taco> recentTacos() {
  3. return tacoService.getRecentTacos();
  4. }

类似地,可以编写 tacoById() 方法来处理 RxJava 的 Single 元素,而不是 Mono:

  1. @GetMapping("/{id}")
  2. public Single<Taco> tacoById(@PathVariable("id") Long id) {
  3. return tacoService.lookupTaco(id);
  4. }

此外,Spring WebFlux controller 方法还可以返回 RxJava 的 Completable,这相当于 Reactor 中的 Mono。WebFlux 还可以返回一个 Flowable,作为 Observable 或 Reactor 的 Flux 的替代。

响应式地处理输入

到目前为止,我们只关心控制器方法返回的响应式类型。但是使用 Spring WebFlux,你还可以接受 Mono 或 Flux 作为处理程序方法的输入。请考虑 DesignTacoController 中 postTaco() 的原始实现:

  1. @PostMapping(consumes="application/json")
  2. @ResponseStatus(HttpStatus.CREATED)
  3. public Taco postTaco(@RequestBody Taco taco) {
  4. return tacoRepo.save(taco);
  5. }

正如最初编写的,postTaco() 不仅返回一个简单的 Taco 对象,而且还接受一个绑定到请求主体内容的 Taco 对象。这意味着在请求有效负载完全解析并用于实例化 Taco 对象之前,无法调用 postTaco()。这也意味着postTaco() 在对 repository 的 save() 方法的阻塞调用,在返回之前无法返回。简言之,请求被阻塞了两次:当它进入 postTaco() 时,然后在 postTaco() 内部被再次阻塞。但是,通过对 postTaco() 应用一点响应式编码,可以使其成为一种完全无阻塞的请求处理方法:

  1. @PostMapping(consumes="application/json")
  2. @ResponseStatus(HttpStatus.CREATED)
  3. public Mono<Taco> postTaco(@RequestBody Mono<Taco> tacoMono) {
  4. return tacoRepo.saveAll(tacoMono).next();
  5. }

在这里,postTaco() 接受 Mono 并调用 repository 的 saveAll() 方法,正如你将在下一章中看到的,该方法接受 Reactive Streams Publisher 的任何实现,包括 Mono 或 Flux。saveAll() 方法返回一个 Flux,但是因为是从 Mono 开始的,所以 Flux 最多会发布一个 Taco。因此,你可以调用 next() 来获取将从 postTaco() 返回的 Mono

通过接受 Mono 作为输入,可以立即调用该方法,而无需等待 Taco 从请求体被解析。由于 repository 也是被动的,它将接受一个 Mono 并立即返回一个 Flux,从中调用 next() 并返回 Mono。所有这些都是在处理请求之前完成的!

Spring WebFlux 是 Spring MVC 的一个极好的替代品,它提供了使用与 Spring MVC 相同的开发模型编写响应式 web 应用程序的选项。不过,Spring 5 还有另一个新的窍门。让我们看看如何使用 Spring 5 的新函数式编程风格创建响应式 API。