一、概述

WebClient是从Spring WebFlux 5.0版本开始提供的一个非阻塞的基于响应式编程的进行Http请求的客户端工具。在内部,WebClient 委托给HTTP客户端库(默认情况下使用 Reactor Netty)。WebClient中提供了标准Http请求方式对应的get、post、put、delete等方法,可以用来发起相应的请求。
Spring 对其描述如下:

Non-blocking, reactive client to perform HTTP requests, exposing a fluent, reactive API over underlying HTTP client libraries such as Reactor Netty.

二、配置

创建WebClient最简单的使用静态工厂方法:

  • WebClient.create()
  • WebClient.create(String baseUrl)

还可以使用 Builder 模式提供进一步选项:

  • uriBuilderFactory: 配置一个基础的 URL 。
  • defaultHeader: 每个request都带有的请求头。
  • defaultCookie: 每个request持有的cookie。
  • defaultRequest: 用户定制任意请求。
  • filter
  • exchangeStrategies: 定制HTTP 消息读取/写入。
  • clientConnector: HTTP 客户端库设置。
    1. // 配置HTTP编码器
    2. WebClient client = WebClient.builder()
    3. .exchangeStrategies(builder -> {
    4. return builder.codecs(codecConfigurer -> {
    5. //...
    6. });
    7. })
    8. .build();
    WebClient一旦配置好就是不可变的,可以克隆它构建一个修改后的副本而不影响实例。 ```java WebClient client1 = WebClient.builder()
    1. .filter(filterA).filter(filterB).build();

WebClient client2 = client1.mutate() .filter(filterC).filter(filterD).build();

<a name="1ZLvc"></a>
## 2.1 MaxInMemorySize
默认是`256KB`,可以通过以下配置限制
```java
WebClient webClient = WebClient.builder()
        .exchangeStrategies(builder ->
            builder.codecs(codecs ->
                codecs.defaultCodecs().maxInMemorySize(2 * 1024 * 1024)
            )
        )
        .build();

2.2 Resource

如果服务器与进程同步,通常不需要显式关闭。然后,如果服务器可以启动或停止,比如由Spring容器管理,那么可以声明一个类型为 ReactorResourceFactory 的 Spring 管理 bean,其值为globalResources = true (默认值) ,以确保在Spring ApplicationContext 关闭时Reactor Netty全局资源被关闭,

@Bean
public ReactorResourceFactory reactorResourceFactory() {
    return new ReactorResourceFactory();
}

当然,可以将globalResources = true 设置为 false ,这样,确保所有 Reactor Netty 客户端和服务端实例使用共享资源的责任在于你。

@Bean
public ReactorResourceFactory resourceFactory() {
    ReactorResourceFactory factory = new ReactorResourceFactory();
    factory.setUseGlobalResources(false); 
    return factory;
}

@Bean
public WebClient webClient() {

    Function<HttpClient, HttpClient> mapper = client -> {
        // Further customizations...
    };

    ClientHttpConnector connector =
            new ReactorClientHttpConnector(resourceFactory(), mapper); 

    return WebClient.builder().clientConnector(connector).build(); 
}

2.3 Timeouts 超时

2.3.1 连接超时

import io.netty.channel.ChannelOption;

HttpClient httpClient = HttpClient.create()
        .tcpConfiguration(client ->
                client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000));

2.3.2 读取/写入超时

import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;

HttpClient httpClient = HttpClient.create()
        .tcpConfiguration(client ->
                client.doOnConnected(conn -> conn
                        .addHandlerLast(new ReadTimeoutHandler(10))
                        .addHandlerLast(new WriteTimeoutHandler(10))));

2.4 配置Jetty

@Bean
public JettyResourceFactory resourceFactory() {
    return new JettyResourceFactory();
}

@Bean
public WebClient webClient() {

    HttpClient httpClient = new HttpClient();
    // Further customizations...

    ClientHttpConnector connector =
            new JettyClientHttpConnector(httpClient, resourceFactory()); 

    return WebClient.builder().clientConnector(connector).build(); 
}

三、Retrieve()

retrieve() 方法是获取响应体并对其解码的最简单的方法。

WebClient client = WebClient.create("https://example.org");

Mono<Person> result = client.get()
        .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(Person.class);

可以使用 onStatus 方法处理特定类型异常:

Mono<Person> result = client.get()
        .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .onStatus(HttpStatus::is4xxClientError, response -> ...)
        .onStatus(HttpStatus::is5xxServerError, response -> ...)
        .bodyToMono(Person.class);

四、exchange()

exchange()retrieve() 方法提供更多的控制。可以获取完成的对象。

Mono<Person> result = client.get()
        .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
        .exchange()
        .flatMap(response -> response.bodyToMono(Person.class));

exchange()retrieve() 方法不同的是 exchange() 没有提供 4xx5xx 错误响应API,你必须检查响应状态码从而决定下一步操作。

五、Request Body

请求体可以从 ReactiveAdapterRegistry 处理的任何异步类型中进行编码,如`Mono。

Mono<Person> personMono = ... ;

Mono<Void> result = client.post()
        .uri("/persons/{id}", id)
        .contentType(MediaType.APPLICATION_JSON)
        .body(personMono, Person.class)
        .retrieve()
        .bodyToMono(Void.class);

或者被编码成对象流:

Flux<Person> personFlux = ... ;

Mono<Void> result = client.post()
        .uri("/persons/{id}", id)
        .contentType(MediaType.APPLICATION_STREAM_JSON)
        .body(personFlux, Person.class)
        .retrieve()
        .bodyToMono(Void.class);

当然,如果你有实例的值,也可以使用 bodyValue

Person person = ... ;

Mono<Void> result = client.post()
        .uri("/persons/{id}", id)
        .contentType(MediaType.APPLICATION_JSON)
        .bodyValue(person)
        .retrieve()
        .bodyToMono(Void.class);

5.1 表单数据

可以提供 MultiValueMap<String, String> 作为请求体。注意,content-type会自动通过 FormHttpMessageWriter 设置为 application/x-www-form-urlencoded

MultiValueMap<String, String> formData = ... ;

Mono<Void> result = client.post()
        .uri("/path", id)
        .bodyValue(formData)
        .retrieve()
        .bodyToMono(Void.class);

也可以使用内联API设置body

import static org.springframework.web.reactive.function.BodyInserters.*;

Mono<Void> result = client.post()
        .uri("/path", id)
        .body(fromFormData("k1", "v1").with("k2", "v2"))
        .retrieve()
        .bodyToMono(Void.class);

5.2 Multipart数据类型

需要提供 MultiValueMap<String,?> 值是表示部分内容的 Object 实例或表示部分内容和头的 HttpEntity 实例。

MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("fieldPart", "fieldValue");
builder.part("filePart1", new FileSystemResource("...logo.png"));
builder.part("jsonPart", new Person("Jason"));
builder.part("myPart", part); // Part from a server request

MultiValueMap<String, HttpEntity<?>> parts = builder.build();

在大多数情况下,您不必为每个部件指定 Content-Type。内容类型是根据序列化它所选择的 HttpMessageWriter自动确定的,对于 Resource,则是根据文件扩展名确定的。如果有必要,可以通过一个重载的构建器部件方法显式提供 MediaType用于每个部件。

MultipartBodyBuilder builder = ...;
// 
Mono<Void> result = client.post()
        .uri("/path", id)
        .body(builder.build())
        .retrieve()
        .bodyToMono(Void.class);

还可以通过内联方式

import static org.springframework.web.reactive.function.BodyInserters.*;

Mono<Void> result = client.post()
        .uri("/path", id)
        .body(fromMultipartData("fieldPart", "value").with("filePart", resource))
        .retrieve()
        .bodyToMono(Void.class);

六、客户端过滤器

通过 WebClient.Builder 注册一个客户端过滤器(ExchangeFileterFunction) ,目的是为了拦截和修改请求。

WebClient client = WebClient.builder()
        .filter((request, next) -> {

            ClientRequest filtered = ClientRequest.from(request)
                    .header("foo", "bar")
                    .build();

            return next.exchange(filtered);
        })
        .build();

比如身份验证。
过滤器应用于全局的每个请求。你可以向ClientRequest添加请求属性,以便链路()的所有过滤器都可以访问这些属性。

WebClient client = WebClient.builder()
        .filter((request, next) -> {
            Optional<Object> usr = request.attribute("myAttribute");
            // ...
        })
        .build();

client.get().uri("https://example.org/")
        .attribute("myAttribute", "...")
        .retrieve()
        .bodyToMono(Void.class);

    }

七、同步使用

Person person = client.get().uri("/person/{id}", i).retrieve()
    .bodyToMono(Person.class)
    .block();

List<Person> persons = client.get().uri("/persons").retrieve()
    .bodyToFlux(Person.class)
    .collectList()
    .block();

然而,如果需要进行多个调用,避免单独阻塞每个响应更有效,而是等待组合结果:

Mono<Person> personMono = client.get().uri("/person/{id}", personId)
        .retrieve().bodyToMono(Person.class);

Mono<List<Hobby>> hobbiesMono = client.get().uri("/person/{id}/hobbies", personId)
        .retrieve().bodyToFlux(Hobby.class).collectList();

Map<String, Object> data = Mono.zip(personMono, hobbiesMono, (person, hobbies) -> {
            Map<String, String> map = new LinkedHashMap<>();
            map.put("person", person);
            map.put("hobbies", hobbies);
            return map;
        })
        .block();

以上只是一个例子,还有许多其他模式和操作符用于组合一个响应式管道,从而可以进行许多远程调用。

八、测试

可以使用模拟 Web 服务器,如 OkHttp MockWebServer

  1. 导入 Maven 依赖 ```xml com.squareup.okhttp3 mockwebserver 3.14.9 test

io.projectreactor reactor-test 3.3.9.RELEASE test


2. 创建测试类
```java
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@ParameterizedTest(name = "[{index}] webClient [{0}]")
@MethodSource("arguments")
@interface ParameterizedWebClientTest {
}

static Stream<ClientHttpConnector> arguments() {
    return Stream.of(
        new ReactorClientHttpConnector()
    );
}


private MockWebServer server;

private WebClient webClient;


private void startServer(ClientHttpConnector connector) {
    this.server = new MockWebServer();
    this.webClient = WebClient
        .builder()
        .clientConnector(connector)
        .baseUrl(this.server.url("/").toString())
        .build();
}

@AfterEach
void shutdown() throws IOException {
    this.server.shutdown();
}


private void prepareResponse(Consumer<MockResponse> consumer) {
    MockResponse response = new MockResponse();
    consumer.accept(response);
    this.server.enqueue(response);
}
  1. 编写测试类

    @ParameterizedWebClientTest
    void retrieve(ClientHttpConnector connector) {
     startServer(connector);
    
     prepareResponse(response -> response.setHeader("Content-Type", "text/plain")
                     .addHeader("Set-Cookie", "testkey1=testvalue1;")
                     .addHeader("Set-Cookie", "testkey2=testvalue2; Max-Age=42; HttpOnly; Secure")
                     .setBody("Hello Spring!"));
    
     Mono<String> result = this.webClient.get()
         .uri("/greeting")
         .cookie("testkey", "testvalue")
         .header("X-Test-Header", "testvalue")
         .retrieve()
         .bodyToMono(String.class);
    
     StepVerifier.create(result)
         .expectNext("Hello Spring!")
         .expectComplete()
         .verify(Duration.ofSeconds(3));
    
     expectRequestCount(1);
     expectRequest(request -> {
         assertThat(request.getHeader(HttpHeaders.COOKIE)).isEqualTo("testkey=testvalue");
         assertThat(request.getHeader("X-Test-Header")).isEqualTo("testvalue");
         assertThat(request.getHeader(HttpHeaders.ACCEPT)).isEqualTo("*/*");
         assertThat(request.getPath()).isEqualTo("/greeting");
     });
    }