我们在之前的教程中编写了两个做单元测试的类来自动测试那些访问数据库的方法。在测试各种对外暴露的 API 接口时,使用了 Postman 这样的工具进行手工测试。在实际的项目开发中,对接口的测试也应该编写测试代码来自动完成,此时就涉及到如何编写 Web 客户端程序(访问要测试的接口)。

13.1 阻塞式与非阻塞式客户端

用 Spring 开发 Web 客户端功能通常有两种方案:使用 RestTemplate 和 WebClient。

13.1.1 RestTemplate 阻塞式客户端

很长一段时间以来,Spring 一直提供 RestTemplate 作为 Web 客户端的抽象模型。在底层,RestTemplate 使用了基于每个请求对应一个线程模型(thread-per-request)的 Java Servlet API。这意味着直到 Web 客户端收到响应之前,线程都一直处于被阻塞状态,这导致每个线程都消耗了一定的内存和处理器周期。

如果发出了很多请求在等待产生结果,同时请求的又是一些慢服务,那么这些等待结果的请求迟早会堆积起来,创建很多线程,这些线程会消耗(甚至于耗尽)线程池和可用内存。此外由于频繁的处理器上下文(线程)切换,还会导致整个应用系统的性能大幅度地降低。

13.1.2 WebClient 非阻塞式客户端

WebClient 是Spring 5 中全新的方案,是 Spring WebFlux 库的一部分,它提供了RestTemplate的替代方法。

WebClient 使用 Spring Reactive Framework 所提供的异步非阻塞解决方案为每个事件创建“任务”,Reactive 框架将对这些 “任务” 排队,仅在适当的响应可用时执行它们。

Reactive 框架使用事件驱动的体系结构。它提供了通过 Reactive Streams API 组合异步逻辑的方法,因此可以使用流畅的函数式 API 编写客户端代码,并将响应类型(Mono 和 Flux)作为声明来进行组合。

与同步/阻塞方法相比,WebClient 可以使用更少的线程和系统资源来处理更多的逻辑。

13.2 案例对比

为了体验两种方法间的差异,我们需要使用许多并发客户端请求来运行性能测试。在一定数量的并发请求后,我们将看到阻塞方法性能的显著下降(当然,无论请求数量如何,反应式/非阻塞方法都可以提供恒定的性能)。

就本文而言,我们实现两个 REST 端点,一个使用 RestTemplate,另一个使用 WebClient。他们的任务是调用另一个响应慢的 REST Web 服务,该服务返回一个 Tweet List。

13.2.1 引入依赖项目

首先,我们需要引入 Spring Boot WebFlux starter 依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-webflux</artifactId>
  4. <version>2.5.6</version>
  5. </dependency>

13.2.2 准备慢服务 REST 端点

给已经定义过的 Book 类增加一个构造方法

    public Book(String name) {
        this.name = name;
    }

下面是我们的慢服务 REST 端点:

    @IgnoreRestful
    @GetMapping("/slow-service")
    private List<Book> getAllTweets() throws InterruptedException {
        Thread.sleep(2000L);

        return Arrays.asList(
                new Book("1. RestTemplate rules"),
                new Book("2. WebClient is better"),
                new Book("3. Both are useful"));
    }

13.2.3 使用 RestTemplate 调用慢服务

现在编写一个接口使用 RestTemplate 构造 Web 客户端调用慢服务:

    @GetMapping("/blocking")
    public void accessRestfulBlocking() {
        // URI must be not absolute
        final String uri = "http://localhost:8088/test/slow-service";

        System.out.println("Starting BLOCKING Controller!");

        RestTemplate restTemplate = new RestTemplate();
        ResponseEntity<List<Book>> response = restTemplate.exchange(
                uri, HttpMethod.GET, null,
                new ParameterizedTypeReference<>(){});

        List<Book> result = response.getBody();
        assert result != null;
        result.forEach(item -> System.out.println(item.getName()));

        System.out.println("Exiting BLOCKING Controller!");
    }

调用这个接口,代码将会阻塞以等待来自慢服务的响应。只有当收到响应后,才会执行此方法中的其余代码。下面是这个接口运行完成后在控制台窗口看到的输出内容。可见它中间是同步的方式阻塞等待数据返回。

Starting BLOCKING Controller!
1. RestTemplate rules
2. WebClient is better
3. Both are useful
Exiting BLOCKING Controller!

13.2.4 使用 WebClient 调用慢服务

接下来编写一个接口使用 WebClient 构造 Web 客户端调用慢服务:

    @GetMapping("/non-blocking")
    public void accessRestfulNonBlocking() {
        System.out.println("Starting BLOCKING Controller!");

        Flux<Book> bookFlux = WebClient.create()
                .get()
                .uri("http://localhost:8088/test/slow-service")
                .retrieve()
                .bodyToFlux(Book.class);

        bookFlux.subscribe(item -> System.out.println(item.getName()));

        System.out.println("Exiting BLOCKING Controller!");
    }

调用这个接口,代码将不会阻塞而直接执行完毕。收到慢服务的响应以后会执行此方法中的其余代码。下面是这个接口运行完成后在控制台窗口看到的输出内容。可见它中间是异步的方式非阻塞等待数据返回。

Starting BLOCKING Controller!
Exiting BLOCKING Controller!
1. RestTemplate rules
2. WebClient is better
3. Both are useful

13.3.5 方式选择的原则

RestTemplate 使用 Java Servlet API,因此是同步和阻塞的。反之 WebClient 是异步的,在等待响应返回时不会阻塞正在执行的线程。只有当程序就绪时,才会产生通知。

尽管 RestTemplate 阻塞的方式会让程序的逻辑更加简单。但在某些情况下,与阻塞方法相比,非阻塞方法因为节省系统资源要,因此在高并发和慢服务的场景下是更好的选择。

同时,根据官方的说明,从5.0开始,RestTemplate 处于维护模式(maintenance mode),只有很小的更改和错误请求可以接受。请考虑使用 org.springframework.web.reactive.client.WebClientwhich,它具有更现代的API,支持同步、异步和流式传输场景。

As of 5.0 this class is in maintenance mode, with only minor requests for changes and bugs to be accepted going forward. Please, consider using the org.springframework.web.reactive.client.WebClientwhich has a more modern API and supports sync, async, and streaming scenarios.

因此我们应该选择 WebClient 开发 Web 客户端程序。

13.3 WebClient 的基本开发方法

WebClient 在内部委托给 HTTP 客户端库。 默认情况下,它使用 Netty,同时内置了对 Jetty 反应式HttpClient 的支持。

13.3.1 创建及配置 WebClient

创建 WebClient 的最简单方法是通过静态工厂方法之一:

  1. 直接初始化,不加任何参数

    WebClient client = WebClient.create();
    
  2. 初始化时,提供一个默认的调用地址

    WebClient client = WebClient.create("http://localhost:8088");
    

上面的方法使用默认的 Netty。您还可以将 WebClient.builder() 与其他选项一起使用:

  • uriBuilderFactory:定制的UriBuilderFactory用作基本URL。
  • defaultHeader:每个请求的标题。
  • defaultCookie:每个请求的cookie。
  • defaultRequest:消费者自定义每个请求。
  • filter:针对每个请求的客户端过滤器。
  • exchangeStrategies:HTTP消息读取器/写入器定制。
  • clientConnector:HTTP客户端库设置。


以下示例配置HTTP编解码器:

ExchangeStrategies strategies = ExchangeStrategies.builder()
    .codecs(configurer -> {
          // ...
    })
    .build();

WebClient client = WebClient.builder()
    .exchangeStrategies(strategies)
    .build();

构建好的 WebClient 实例是不可变的,但你可以克隆它并构建修改后的副本而不影响原始实例:

WebClient client1 = WebClient.builder()
    .filter(filterA).filter(filterB).build();

WebClient client2 = client1.mutate()
    .filter(filterC).filter(filterD).build();

// client1 has filterA, filterB
// client2 has filterA, filterB, filterC, filter

要自定义 Netty设置,只需提供一个预先配置的 HttpClient:

HttpClient httpClient = HttpClient.create().secure(sslSpec -> ...);

WebClient webClient = WebClient.builder()
    .clientConnector(new ReactorClientHttpConnector(httpClient))
    .build()

如果要配置连接超时

//通过HttpClient设置超时时间
HttpClient httpClient = HttpClient.create()
    //设置连接超时时间
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
    //设置响应超时时间
    .responseTimeout(Duration.ofMillis(5000))
    //分别设置读写超时时间
    .doOnConnected(conn -> conn
            .addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS))     
            .addHandlerLast(new WriteTimeoutHandler(5000,TimeUnit.MILLISECONDS))); 

WebClient client = WebClient.builder()
    .clientConnector(new ReactorClientHttpConnector(httpClient))
    .build();

以下示例显示如何使用 Jetty 并定义设置:

HttpClient httpClient = new HttpClient();
httpClient.setCookieStore(...);
ClientHttpConnector connector = new JettyClientHttpConnector(httpClient);

WebClient webClient = WebClient.builder().clientConnector(connector).build()

默认情况下,HttpClient 创建自己的资源,这些资源将保持活动状态,直到进程退出或调用 stop() 为止。

13.3.2 发起GET请求并解析结果

WebClient 发起 get 请求有两种方法:调用 get() 方法或 method(HttpMethod.GET) 方法,之后执行 retrieve()方法可以获取响应主体。

获得响应主体之后,有两种用来解码的类型:Flux<T>Mono<T>

  • Flux可以触发 0 到多个事件,并根据实际情况结束处理或触发错误。
  • Mono最多只触发一个事件。

因为这两种类型之间的简单区别,我们可以简单的认为如果返回的一个单一对象,既可以使用 Mono 也可以使用 Flux,如果是数组或这 List 类型一组数据,则应该使用 Flux。

Flux和Mono的一些操作利用了这个特点在这两种类型间互相转换。例如,调用Flux的single()方法将返回一个Mono,而使用concatWith()方法把两个Mono串在一起就可以得到一个Flux。类似地,有些操作对Mono来说毫无意义(例如take(n)会得到n>1的结果),而有些操作只有作用在Mono上才有意义(例如or(otherMono))。Reactor设计的原则之一是要保持API的精简,而对这两种响应式类型的分离,是表现力与API易用性之间的折中。

对于 /test/first 接口这样以统一的 RestfulResult 格式返回的接口,下面两种方式是等价的

        Flux<RestfulResult> fluxResult = WebClient.create()
                .get()
                .uri("http://localhost:8088/test/first")
                .retrieve()
                .bodyToFlux(RestfulResult.class);

        fluxResult.subscribe(System.out::println);
        Mono<RestfulResult> monoResult = WebClient.create()
                .get()
                .uri("http://localhost:8088/test/first")
                .retrieve()
                .bodyToMono(RestfulResult.class);

        monoResult.subscribe(System.out::println);

但如果把前文案例对比中使用 WebClient 的代码也把 Flux 换成 Mono

        Mono<Book> bookMono = WebClient.create()
                .get()
                .uri("http://localhost:8088/test/slow-service")
                .retrieve()
                .bodyToMono(Book.class);

        bookMono.subscribe(item -> System.out.println(item.getName()));

运行的时候会出现异常,提示的原因为

Cannot deserialize value of type `com.longser.union.cloud.data.model.Book` from Array value

这个提示很明显的说明了 Mono 不能用来处理数组(多个元素)的特点。

当需要给接口传递参数的时候,可以使用如下的方式

在执行 retrieve() 方法之前,还可以通过如下方法设置和请求相关的参数或条件

  • accept()
  • acceptCharset()
  • cookie()
  • cookies()
  • ifModifiedSince()
  • ifNoneMatch()
  • header()
  • headers()
  • attribute()
  • attributes()

    13.3.3 发起POST请求并解析结果

    WebClient 发起 post 请求有两种方法:调用 post() 方法或 method(HttpMethod.POST) 方法。

伴随请求的表单数据用 MultiValueMap 来构造

MultiValueMap<String, String> formData = new LinkedMultiValueMap<>();

formData.add("name","一本好书");
formData.add("price","13.34");
formData.add("isPublic","true");
formData.add("publishDate","2021-11-19");

然后用 BodyInserters.fromFormData() 处理后传递给 body() 方法

Mono<RestfulResult> monoResult = WebClient.create()
    .post()
    .uri("http://localhost:8088/test/book/new")
    .contentType(MediaType.APPLICATION_FORM_URLENCODED)
    .body(BodyInserters.fromFormData(formData))
    .retrieve()
    .bodyToMono(RestfulResult.class);

monoResult.subscribe(System.out::println);

当年你也可以用下面的方式构造请求数据

BodyInserters.FormInserter<String> formInserter =  BodyInserters.fromFormData("name","一本好书")
    .with("price","13.34")
    .with("isPublic","true")
    .with("publishDate","2021-11-19");

然后直接把这个变量传递给 body() 方法

    .body(formInserter)

如果传递的数据类型是 Multipart Data,那么你需要提供一个 MultiValueMap ,其值可以是代表零件内容的对象实例或代表零件内容和标题的 HttpEntity 实例。 MultipartBodyBuilder 提供了一个方便的 API 来构造它。具体的写法如下

MultipartBodyBuilder builder = new MultipartBodyBuilder();

builder.part("name", "一张照片");
builder.part("file", new FileSystemResource("/007.jpg"));

MultiValueMap<String, HttpEntity<?>> parts = builder.build();

Mono<RestfulResult> monoResult = WebClient.create()
    .post()
    .uri("http://localhost:8088/api/file/upload")
    .body(BodyInserters.fromMultipartData(parts))
    .retrieve()
    .bodyToMono(RestfulResult.class);

monoResult.subscribe(System.out::println);

在大多数情况下,您不必为每个部分指定Content-Type。 内容类型是根据选择用于对其进行序列化的HttpMessageWriter自动确定的,对于资源而言,取决于文件扩展名。 如有必要,您可以通过重载的构建器part方法之一显式提供MediaType以供每个零件使用。

让我们在看一个复杂一些的例子:

MultiValueMap<String, String> bodyValues = new LinkedMultiValueMap<>();

bodyValues.add("key", "value");
bodyValues.add("another-key", "another-value");

String response = client.post()
    .uri(new URI("https://httpbin.org/post"))
    .header("Authorization", "Bearer MY_SECRET_TOKEN")
    .contentType(MediaType.APPLICATION_FORM_URLENCODED)
    .accept(MediaType.APPLICATION_JSON)
    .body(BodyInserters.fromFormData(bodyValues))
    .retrieve()
    .bodyToMono(String.class)
    .block();

注意不要使用 syncBody() 方法来传递参数,这个方法已经被废弃。

13.3.4 解析响应的内容

到目前为止,我们一直专注于读取响应正文而忽略了响应头(Headers)等其他数据。有时候我们会需要处理这些数据,此时可以使用 toEntity() 为我们提供的包装了结果数据的 ResponseEntity 对象。下面是具体的写法

Mono<ResponseEntity<RestfulResult>> monoResult = WebClient.create()
    .get()
    .uri("http://localhost:8088/test/first")
    .retrieve()
    .toEntity(RestfulResult.class);

monoResult.subscribe(response -> {
    System.out.println("Response Code: " + response.getStatusCode());
    System.out.println("Response Code Value: " + response.getStatusCodeValue());
    System.out.println("Response Headers: " + response.getHeaders());
    System.out.println("Response Body: " + response.getBody());
});

下面它的输出结果

Response Code: 200 OK
Response Code Value: 200
Response Headers: [Content-Type:"application/json;charset=UTF-8", Transfer-Encoding:"chunked", Date:"Sat, 06 Nov 2021 16:48:04 GMT"]
Response Body: {
  "success" : true,
  "errorCode" : 0,
  "errorMessage" : "",
  "data" : {
    "id" : 1,
    "userName" : "David",
    "nickName" : "Grace Runner",
    "mobile" : "18801681588",
    "password" : "",
    "gender" : 1,
    "degree" : 1
  }
}

除了用 ResponseEntity 封装以外,还可以用下面的方法直接获得并解析响应的内容

Mono<RestfulResult> monoResult = WebClient.create("http://localhost:8088/test/first")
    .get()
    .exchangeToMono(response -> {
        System.out.println("Response Code: " + response.statusCode());
        System.out.println("Response Code Value: " + response.statusCode().value());
        System.out.println("Response Headers: " + response.headers().asHttpHeaders());
        System.out.println("Response Content Type: " + response.headers().contentType());
        System.out.println("Response Header Date: " + response.headers().header("Date"));

        if (response.statusCode().equals(HttpStatus.OK)) {
            return response.bodyToMono(RestfulResult.class);
        } else if (response.statusCode().is4xxClientError()) {
            return Mono.just(RestfulResult.fail(response.statusCode().value(),"Error response"));
        } else {
            return response.createException().flatMap(Mono::error);
        }
    });

monoResult.subscribe(System.out::println);

这段代码可以得到如下的结果

Response Code: 200 OK
Response Code Value: 200
Response Headers: [Content-Type:"application/json;charset=UTF-8", Transfer-Encoding:"chunked", Date:"Sun, 07 Nov 2021 03:14:38 GMT"]
Response Content Type: Optional[application/json;charset=UTF-8]
Response Header Date: [Sun, 07 Nov 2021 03:14:38 GMT]
{
  "success" : true,
  "errorCode" : 0,
  "errorMessage" : "",
  "data" : {
    "id" : 1,
    "userName" : "David",
    "nickName" : "Grace Runner",
    "mobile" : "18801681588",
    "password" : "",
    "gender" : 1,
    "degree" : 1
  }
}

补充说明:

  • 除了 exchangeToMono() 方法以外,你还可以使用 exchangeToFlux() 方法处理多个返回结果。
  • 如果请求方式为 POST ,那么执行 exchangeToMono() 或 exchangeToFlux() 要在 body() 方法之后。

13.3.5 响应4xx和5xx异常状态

默认情况下,具有4xx或5xx状态代码的响应会引发 WebClientResponseException 或其 HTTP 状态特定的子类之一(如 WebClientResponseException.BadRequest、WebClientResponseException.NotFound等),此时响应处理会被中断。你可以使用 onStatus 方法来处理这些状态而不是抛出异常。

处理的时候需要一个如下类型的回调方法

Function<ClientResponse, Mono<? extends Throwable>>

下面的代码用两种不同方式来定义这个方法

    private Mono exceptionFunction(ClientResponse response) {
        System.out.println(response.statusCode());
        return Mono.empty();
    }

    @GetMapping("/get-404")
    public void getNotExist() {
        Mono<RestfulResult> monoResult = WebClient.create()
                .get()
                .uri("http://localhost:8088/test/none")
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, this::exceptionFunction)
                .onStatus(HttpStatus::is5xxServerError, response -> {
                    System.out.println(response.statusCode());
                    return Mono.empty();
                })
                .bodyToMono(RestfulResult.class);

        monoResult.subscribe(System.out::println);
    }

http://localhost:8088/test/none 是一个不存在的接口,访问它的时候会发生 404 Not Found 异常。由于用 onStatus() 做了处理,整个请求处理过程不会被中断,在控制台窗口可以得到下面的输出

404 NOT_FOUND
{
  "success" : true,
  "errorCode" : 0,
  "errorMessage" : "",
  "data" : {
    "timestamp" : "2021-11-06T16:19:53.067+00:00",
    "status" : 404,
    "error" : "Not Found",
    "path" : "/test/none"
  }
}

如果用 Postman 访问这个不存在的地址,可以得到相同的结果。

13.3.6 下载二进制文件

下面是载下载指定链接图片的示例

String imgUrl = "https://cdn.nlark.com/yuque/0/2021/jpeg/12725763/1618280829183-6607160c-8710-4fdd-b0fd-b54588a03a29.jpeg";
String filename = imgUrl.substring(imgUrl.lastIndexOf("/") + 1);
String extName = imgUrl.substring(imgUrl.lastIndexOf(".") + 1);

Mono<Resource> resp = WebClient.create()
    .get()
    .uri(imgUrl)
    .accept(MediaType.IMAGE_JPEG, MediaType.IMAGE_PNG)
    .retrieve()
    .bodyToMono(Resource.class);

Resource resource = resp.block();
assert resource != null;
BufferedImage bufferedImage = ImageIO.read(resource.getInputStream());
ImageIO.write(bufferedImage, extName, new File(filename));

下面是载下载指定链接文件的示例

String fileUrl = "http://www.bjzgh12351.org/upload/content/article/attachment/160922851463206.doc";
String filename = fileUrl.substring(fileUrl.lastIndexOf("/") + 1);
Path path = Paths.get(filename);

Mono<DataBuffer> resp = WebClient.create()
    .get()
    .uri(fileUrl)
    .accept(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL)
    .retrieve()
    .bodyToMono(DataBuffer.class);

DataBufferUtils.write(resp, path, StandardOpenOption.CREATE).block();

13.3.6 设置请求过滤器

您可以通过WebClient.Builder注册客户端过滤器(ExchangeFilterFunction),以拦截和修改请求。

WebClient client = WebClient.builder()
    .filter((request, next) -> {
        ClientRequest filtered = ClientRequest.from(request)
            .header("foo", "bar")
            .build();

        return next.exchange(filtered);
    })
    .build();

这可用于跨领域的关注,例如身份验证。 以下示例使用过滤器通过静态工厂方法进行基本身份验证:

WebClient client = WebClient.builder()
    .filter(basicAuthentication("user", "password"))
    .build();

过滤器全局应用于每个请求。 要更改特定请求的过滤器行为,您可以将请求属性添加到ClientRequest,然后链中的所有过滤器都可以访问该请求属性,如以下示例所示:

WebClient client = WebClient.builder()
    .filter((request, next) -> {
        Optional<Object> usr = request.attribute("myAttribute");
        // ...
    })
    .build();

client.get().uri("https://example.org/")
    .attribute("myAttribute", "...")
    .retrieve()
    .bodyToMono(Void.class);

您还可以复制现有的 WebClient,插入新的过滤器或删除已注册的过滤器。 以下示例在索引 0 处插入一个基本身份验证过滤器:

WebClient client = webClient.mutate()
    .filters(filterList -> {
        filterList.add(0, basicAuthentication("user", "password"));
    })
    .build();

13.3.7 以同步阻塞的方式使用

做为一个特别的用法,你能够以同步阻塞的方式使用WebClient:

Person person = client.get().uri("/person/{id}", i).retrieve()
    .bodyToMono(Person.class)
    .block();

List<Person> persons = client.get().uri("/persons").retrieve()
    .bodyToFlux(Person.class)
    .collectList()
    .block();

如果需要进行多次通信,则可以避免单独阻止每个响应,而等待合并的结果,这样会更有效:

Mono<Person> personMono = client.get().uri("/person/{id}", personId)
    .retrieve().bodyToMono(Person.class);

Mono<List<Hobby>> hobbiesMono = client.get().uri("/person/{id}/hobbies", personId)
    .retrieve().bodyToFlux(Hobby.class).collectList();

Map<String, Object> data = Mono.zip(personMono, hobbiesMono, (person, hobbies) -> {
    Map<String, String> map = new LinkedHashMap<>();
    map.put("person", personName);
    map.put("hobbies", hobbies);

    return map;
})
.block();

以上仅是一个示例。 还有许多其他模式和运算符可用于构建响应式管道,该响应式管道可进行许多远程调用(可能是嵌套的,相互依赖的),而不会阻塞到最后。

你永远不必阻塞 Spring MVC 控制器。 只需从控制器方法返回结果 Flux 或 Mono。

13.4 总结与讨论

WebClient 很好的支持了响应式模型,而且 API 设计友好,使用非常方便。WebClient 具有很好的性能和稳定性,被广泛用于需要访问其他服务器 API 的 测试 API 的场合。

13.5 参考资料

版权说明:本文由北京朗思云网科技股份有限公司原创,向互联网开放全部内容但保留所有权力。