11.4 响应式地消费 REST API

在第 7 章中,使用 ResTemplate 向 Taco Cloud API 发出客户端请求。RestTemplate 是一个旧的计时器,已经在 Spring3.0 版本中引入。在它的时代,它被用来代表使用它的应用程序发出无数的请求。

但是 ResTemplate 提供的所有方法都处理非响应式域类型和集合。这意味着如果你想以一种响应式的方式处理一个响应的数据,你需要用 Flux 或者 Mono 来包装它。如果你已经有了一个 Flux 或者 Mono,并且你想在 POST 或者 PUT 请求中发送它,那么你需要在发出请求之前将数据提取成一个非响应式类型。

如果有一种方法可以将 RestTemplate 原生地用于响应式类型,那就太好了。不要害,Spring 5 提供了 WebClient 作为 RestTemplate 的一个响应式替代品。WebClient 允许在向外部 API 发出请求时发送和接收响应类型。

使用 WebClient 与使用 RestTemplate 有很大不同。WebClient 没有几个方法来处理不同类型的请求,而是有一个流畅的构件式接口,用来描述和发送请求。使用 WebClient 的一般使用模式是:

  • 创建一个 WebClient 实例(或是注入一个 WebClient bean)
  • 指定用于发送请求的 HTTP 方法
  • 指定 URI 和应该存在于请求中 Header
  • 提交请求
  • 获取响应

让我们看看 WebClient 的几个实际例子,从如何使用 WebClient 发送 HTTP GET 请求开始。

11.4.1 GET 资源

作为 WebClient 使用的一个例子,假设需要从 Taco Cloud API 中获取一个 Ingredient 对象的 ID。可以使用 RestTemplate 的 getForObject() 方法。但是使用 WebClient,你需要构建请求、检索响应,然后提取一个发布 Ingredient 对象的 Mono:

  1. Mono<Ingredient> ingredient = WebClient.create().get()
  2. .uri("http://localhost:8080/ingredients/{id}", ingredientId)
  3. .retrieve()
  4. .bodyToMono(Ingredient.class);
  5. ingredient.subscribe(i -> { ... })

在这里,将使用 create() 创建一个新的 WebClient 实例。然后,使用 get() 和 uri() 定义对 http://localhost:8080/ingresents/{id} 的 GET 请求,其中 {id} 占位符将替换为 ingredentId 中的值。retrieve() 方法执行请求。最后,对 bodyToMono() 的调用将响应的 body 有效负载提取到 Mono 中,可以继续对其应用 addition Mono 操作。

要对bodyToMono()返回的Mono应用其他操作,在发送请求之前订阅它是很重要的。发出可以返回值集合的请求同样容易。例如,以下代码片段获取所有成分:

  1. Flux<Ingredient> ingredients = WebClient.create()
  2. .get()
  3. .uri("http://localhost:8080/ingredients")
  4. .retrieve()
  5. .bodyToFlux(Ingredient.class);
  6. ingredients.subscribe(i -> { ... })

对于大多数情况,获取多条数据与请求单个数据一样。最大的区别在于,你不用 bodyToMono() 来将响应体提取到 Mono 中,而是用 bodyToFlux() 来将其提取到 Flux 中。

和 bodyToMono() 一样,bodyToFlux() 返回的 Flux 还没有被订阅。这允许附加操作(过滤器、映射等)在数据开始流经 Flux 之前应用于 Flux。因此,订阅结果的 Flux 很重要,否则请求将永远不会被发送。

使用一个基本URI发出请求

你可能会发现,自己对许多不同的请求都使用一个通用的基本 URI。在这种情况下,创建一个带有基本 URI 的 WebClient 的 bean 注入到任何需要的地方是很有用的。这样的 bean 如下所示:

  1. @Bean
  2. public WebClient webClient() {
  3. return WebClient.create("http://localhost:8080");
  4. }

然后,在需要使用该基本 URI 发出请求的任何地方,都可以这样注入和使用 WebClient bean:

  1. @Autowired
  2. WebClient webClient;
  3. public Mono<Ingredient> getIngredientById(String ingredientId) {
  4. Mono<Ingredient> ingredient = webClient
  5. .get()
  6. .uri("/ingredients/{id}", ingredientId)
  7. .retrieve()
  8. .bodyToMono(Ingredient.class);
  9. ingredient.subscribe(i -> { ... })
  10. }

因为 WebClient 已经创建,所以可以通过调用 get() 获得使用权限。对于 URI,在调用 uri() 时,只需要指定相对于基 URI 的路径。

对长期运行的请求的定时处理

你可以指望的是,网络并不总是像你期望的那样可靠和快速。或者,远程服务器在处理请求时可能很慢。理想情况下,对远程服务的请求将在合理的时间内返回。如果不是,如果客户没有长时间等待响应,那已经非常值得庆幸了。

为了避免客户端请求被缓慢的网络或服务所阻塞,可以使用 Flux 或 Mono 中的 timeout() 方法来限制等待发布数据的时间。作为示例,考虑在获取成分数据时如何使用 timeout():

  1. Flux<Ingredient> ingredients = WebClient.create()
  2. .get()
  3. .uri("http://localhost:8080/ingredients")
  4. .retrieve()
  5. .bodyToFlux(Ingredient.class);
  6. ingredients
  7. .timeout(Duration.ofSeconds(1))
  8. .subscribe(
  9. i -> { ... },
  10. e -> {
  11. // handle timeout error
  12. })

在订阅 Flux 之前,调用 timeout(),指定了 1s 的持续时间。如果请求能在 1 秒内完成,那么就没问题。但是,如果请求花费的时间超过 1 秒,那么它将超时,并调用作为第二个参数传递给 subscribe() 的错误处理程序。

11.4.2 发送资源

使用 WebClient 发送数据与接收数据没有太大区别。例如,假设有一个 Mono,并且希望向 URI 发送一个 POST 请求,其中包含由 Mono 发布的成分以及 /ingredients 的相对路径。

只需使用 post() 方法而不是 get() 方法,并指定使用 Mono 来调用 body() 来填充请求体:

  1. Mono<Ingredient> ingredientMono = ...;
  2. Mono<Ingredient> result = webClient
  3. .post()
  4. .uri("/ingredients")
  5. .body(ingredientMono, Ingredient.class)
  6. .retrieve()
  7. .bodyToMono(Ingredient.class);
  8. result.subscribe(i -> { ... })

如果没有 Mono 或 Flux 要发送,而手头有原始域对象,那么可以使用 syncBody()。例如,假设有一个要在请求体中发送的 Ingredient,而不是 Mono

  1. Ingedient ingredient = ...;
  2. Mono<Ingredient> result = webClient
  3. .post()
  4. .uri("/ingredients")
  5. .syncBody(ingredient)
  6. .retrieve()
  7. .bodyToMono(Ingredient.class);
  8. result.subscribe(i -> { ... })
  9. `

如果你想要用 PUT 请求更新一个 Ingredient 而不是 POST 请求,那么就调用 put() 来代替 post(),并相应地调整 URI 路径:

  1. Mono<Void> result = webClient
  2. .put()
  3. .uri("/ingredients/{id}", ingredient.getId())
  4. .syncBody(ingredient)
  5. .retrieve()
  6. .bodyToMono(Void.class)
  7. .subscribe();

PUT 请求通常具有空的响应有效负载,因此必须请求 bodyToMono() 返回一个 Void 类型的 Mono。订阅 Mono 后,请求将被发送。

11.4.3 删除资源

WebClient 还允许通过它的 delete() 方法删除资源。例如,以下代码删除了给定 ID 的 Ingredient:

  1. Mono<Void> result = webClient
  2. .delete()
  3. .uri("/ingredients/{id}", ingredientId)
  4. .retrieve()
  5. .bodyToMono(Void.class)
  6. .subscribe();

与 PUT 请求一样,DELETE 请求通常没有有效负载。同样,你需要返回并订阅一个 Mono 来发送请求。

11.4.4 处理错误

到目前为止,所有的 WebClient 示例都假定有一个圆满的结局;没有包含 400 或 500 状态码的响应。如果返回这两种错误状态,WebClient 将记录失败日志;否则,WebClient 将默认忽略该错误。

如果你需要处理此类错误,那么可以使用对 onStatus() 的调用来指定应该如何处理各种 HTTP 状态码。onStatus() 接受两个函数:一个谓词函数是用于匹配 HTTP 状态;另一个函数是给定的 ClientResponse 对象,返回 Mono

为了演示如何使用 onStatus() 创建自定义错误处理程序,请考虑使用以下 WebClient,以获取给定 ID 的 Ingredient:

  1. Mono<Ingredient> ingredientMono = webClient
  2. .get()
  3. .uri("http://localhost:8080/ingredients/{id}", ingredientId)
  4. .retrieve()
  5. .bodyToMono(Ingredient.class);

只要 ingredientId 中的值与已知的成分资源匹配,那么当 Mono 订阅了 Ingredient 对象时,它就会发布该 Ingredient 对象。但是如果没有匹配的成分,会发生什么情况呢?

当订阅一个可能以错误结束的 Mono 或 Flux 时,在调用 subscribe() 时注册一个错误消费者与注册一个数据消费者同样重要:

  1. ingredientMono.subscribe(
  2. ingredient -> {
  3. // handle the ingredient data
  4. ...
  5. },
  6. error-> {
  7. // deal with the error
  8. ...
  9. });

如果找到该成分资源,那么将调用给予 subscribe() 的第一个 lambda(数据使用者)和匹配的 Ingredient 对象。但是,如果没有找到它,那么请求将使用 HTTP 404(NOTFOUND) 响应,这将导致第二个 lambda(错误消费者)在默认情况下被给予一个 WebClientResponseException。

WebClientResponseException 的最大问题是,它没有明确指出导致 Mono 失败的原因。它的名称表明在 WebClient 发出的请求的响应中有错误,但是你需要深入查看 WebClientResponseException 以了解出错的原因。无论如何,如果提供给错误使用者的异常是域特定的,而不是 WebClient,那就更好了。

通过添加自定义错误处理程序,可以提供将状态代码转换为自己选择的 Throwable 的代码。假设你想让一个对成分资源的失败请求导致 Mono 在 UnknownIngredientException 错误中完成。在调用 retrieve() 方法后,在 onStatus() 方法中添加调用,可以实现:

  1. Mono<Ingredient> ingredientMono = webClient
  2. .get()
  3. .uri("http://localhost:8080/ingredients/{id}", ingredientId)
  4. .retrieve()
  5. .onStatus(HttpStatus::is4xxClientError,
  6. response -> Mono.just(new UnknownIngredientException()))
  7. .bodyToMono(Ingredient.class);

onStatus() 调用中的第一个参数是一个谓词,它给定一个 HttpStatus,如果状态码是你想要处理的,则返回 true。如果状态码匹配,那么响应将返回到第二个参数中的函数,由它根据自身需要进行处理,最终返回一个类型为 Throwable 的 Mono。

在示例中,如果状态码是 400 级别的状态码(例如,客户端错误),那么 Mono 将返回一个 UnknownIngredientException。这导致该 ingredientMono 失败,并抛出异常。

注意 HttpStatus::is4xxClientError 是 HttpStatus 的 is4xxClientError 方法的一个方法引用。该方法将在给定的 HttpStatus 对象上调用。如果你愿意,可以在 HttpStatus 上使用另一个方法作为方法引用;或者可以以 lambda 或方法引用的形式提供返回布尔值的自己的函数。

例如,可以通过将调用更改为 onStatus() 来检查 HTTP 404(NOT FOUND) 状态,从而在错误处理方面获得更精确的结果:

  1. Mono<Ingredient> ingredientMono = webClient
  2. .get()
  3. .uri("http://localhost:8080/ingredients/{id}", ingredientId)
  4. .retrieve()
  5. .onStatus(status -> status == HttpStatus.NOT_FOUND,
  6. response -> Mono.just(new UnknownIngredientException()))
  7. .bodyToMono(Ingredient.class);

同样值得一提的是,在处理响应中可能返回的任何 HTTP 状态码时,都可以调用 onStatus()。

11.4.5 交换请求

到目前为止,在使用 WebClient 时,已经使用 retrieve() 方法表示发送请求。在这些情况下,retrieve() 方法返回一个 ResponseSpec 类型的对象,通过该方法,可以通过调用 onStatus()、bodyToFlux() 和 bodyToMono() 等方法来处理响应。使用 ResponseSpec 对于简单的案例来说是好的,但是在某些方面它有局限性。例如,如果需要访问响应头或 cookie 值,那么 ResponseSpec 就不合适了。

当 ResponseSpec 出现短缺时,可以尝试调用 exchange() 而不是 retrieve()。exchange() 方法返回类型为 ClientResponse 的 Mono,可以在该方法上应用响应式操作来检查和使用来自整个响应的数据,包括有效负载、报头和 Cookie。

在我们研究 exchange() 和 retrieve() 的区别之前,让我们先看看它们之间的相似程度。下面的代码片段使用 WebClient 和 exchange() 通过成分 ID 获取单个成分:

  1. Mono<Ingredient> ingredientMono = webClient
  2. .get()
  3. .uri("http://localhost:8080/ingredients/{id}", ingredientId)
  4. .exchange()
  5. .flatMap(cr -> cr.bodyToMono(Ingredient.class));

这大致相当于下面的使用 retrieve() 的例子:

  1. Mono<Ingredient> ingredientMono = webClient
  2. .get()
  3. .uri("http://localhost:8080/ingredients/{id}", ingredientId)
  4. .retrieve()
  5. .bodyToMono(Ingredient.class);

在 exchange() 示例中,不使用 ResponseSpec 的 bodyToMono() 来获取一个 Mono,而是得到一个 Mono,在这个基础上,你可以应用一个平面映射函数来将 ClientResponse 映射到一个 Mono,这个映射函数被扁平化为最终的 Mono。

现在让我们来看看 exchanger() 有什么不同。我们假设来自请求的响应可能包含一个名为 X_UNAVAILABLE 的 header,其值为 true,以指示(由于某种原因)所述成分不可用。为了便于讨论,假设该 header 存在,你希望得到的 Mono 是空的,以便不返回任何内容。可以通过向 flatMap() 添加另一个调用来实现这个场景,这样整个 WebClient 调用看起来就像这样:

  1. Mono<Ingredient> ingredientMono = webClient
  2. .get()
  3. .uri("http://localhost:8080/ingredients/{id}", ingredientId)
  4. .exchange()
  5. .flatMap(cr -> {
  6. if (cr.headers().header("X_UNAVAILABLE").contains("true")) {
  7. return Mono.empty();
  8. }
  9. return Mono.just(cr);
  10. })
  11. .flatMap(cr -> cr.bodyToMono(Ingredient.class));

新的 flatMap() 调用检查给定的 ClientRequest 对象头,寻找一个名为 X_UNAVAILABLE,值为 true 的 header。如果找到,它返回一个空的 Mono。否则,它将返回包含 ClientResponse 的 Mono。无论哪种情况,返回的 Mono 都将平铺成 Mono,以便下一个 flatMap() 的调用操作。