一、概述
WebClient
是从Spring WebFlux 5.0
版本开始提供的一个非阻塞
的基于响应式编程
的进行Http
请求的客户端工具。在内部,WebClient 委托给HTTP
客户端库(默认情况下使用 Reactor Netty)。WebClient
中提供了标准Http
请求方式对应的get、post、put、delete
等方法,可以用来发起相应的请求。Spring
对其描述如下:
Non-blocking, reactive client to perform HTTP requests, exposing a fluent, reactive API over underlying HTTP client libraries such as Reactor Netty.
二、配置
创建WebClient
最简单的使用静态工厂方法:
WebClient.create()
WebClient.create(String baseUrl)
还可以使用 Builder
模式提供进一步选项:
uriBuilderFactory:
配置一个基础的 URL 。defaultHeader:
每个request
都带有的请求头。defaultCookie:
每个request
持有的cookie。defaultRequest:
用户定制任意请求。filter
exchangeStrategies:
定制HTTP 消息读取/写入。clientConnector:
HTTP 客户端库设置。// 配置HTTP编码器
WebClient client = WebClient.builder()
.exchangeStrategies(builder -> {
return builder.codecs(codecConfigurer -> {
//...
});
})
.build();
WebClient
一旦配置好就是不可变的,可以克隆它构建一个修改后的副本而不影响实例。 ```java WebClient client1 = WebClient.builder().filter(filterA).filter(filterB).build();
WebClient client2 = client1.mutate() .filter(filterC).filter(filterD).build();
<a name="1ZLvc"></a>
## 2.1 MaxInMemorySize
默认是`256KB`,可以通过以下配置限制
```java
WebClient webClient = WebClient.builder()
.exchangeStrategies(builder ->
builder.codecs(codecs ->
codecs.defaultCodecs().maxInMemorySize(2 * 1024 * 1024)
)
)
.build();
2.2 Resource
如果服务器与进程同步,通常不需要显式关闭。然后,如果服务器可以启动或停止,比如由Spring容器管理,那么可以声明一个类型为 ReactorResourceFactory 的 Spring 管理 bean,其值为globalResources = true
(默认值) ,以确保在Spring ApplicationContext
关闭时Reactor Netty
全局资源被关闭,
@Bean
public ReactorResourceFactory reactorResourceFactory() {
return new ReactorResourceFactory();
}
当然,可以将globalResources = true
设置为 false
,这样,确保所有 Reactor Netty
客户端和服务端实例使用共享资源的责任在于你。
@Bean
public ReactorResourceFactory resourceFactory() {
ReactorResourceFactory factory = new ReactorResourceFactory();
factory.setUseGlobalResources(false);
return factory;
}
@Bean
public WebClient webClient() {
Function<HttpClient, HttpClient> mapper = client -> {
// Further customizations...
};
ClientHttpConnector connector =
new ReactorClientHttpConnector(resourceFactory(), mapper);
return WebClient.builder().clientConnector(connector).build();
}
2.3 Timeouts 超时
2.3.1 连接超时
import io.netty.channel.ChannelOption;
HttpClient httpClient = HttpClient.create()
.tcpConfiguration(client ->
client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000));
2.3.2 读取/写入超时
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
HttpClient httpClient = HttpClient.create()
.tcpConfiguration(client ->
client.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(10))
.addHandlerLast(new WriteTimeoutHandler(10))));
2.4 配置Jetty
@Bean
public JettyResourceFactory resourceFactory() {
return new JettyResourceFactory();
}
@Bean
public WebClient webClient() {
HttpClient httpClient = new HttpClient();
// Further customizations...
ClientHttpConnector connector =
new JettyClientHttpConnector(httpClient, resourceFactory());
return WebClient.builder().clientConnector(connector).build();
}
三、Retrieve()
retrieve()
方法是获取响应体并对其解码的最简单的方法。
WebClient client = WebClient.create("https://example.org");
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Person.class);
可以使用 onStatus
方法处理特定类型异常:
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> ...)
.onStatus(HttpStatus::is5xxServerError, response -> ...)
.bodyToMono(Person.class);
四、exchange()
exchange()
比 retrieve()
方法提供更多的控制。可以获取完成的对象。
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.bodyToMono(Person.class));
exchange()
与retrieve()
方法不同的是exchange()
没有提供4xx
、5xx
错误响应API,你必须检查响应状态码从而决定下一步操作。
五、Request Body
请求体可以从 ReactiveAdapterRegistry
处理的任何异步类型中进行编码,如`Mono。
Mono<Person> personMono = ... ;
Mono<Void> result = client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.body(personMono, Person.class)
.retrieve()
.bodyToMono(Void.class);
或者被编码成对象流:
Flux<Person> personFlux = ... ;
Mono<Void> result = client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(personFlux, Person.class)
.retrieve()
.bodyToMono(Void.class);
当然,如果你有实例的值,也可以使用 bodyValue
Person person = ... ;
Mono<Void> result = client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(person)
.retrieve()
.bodyToMono(Void.class);
5.1 表单数据
可以提供 MultiValueMap<String, String>
作为请求体。注意,content-type
会自动通过 FormHttpMessageWriter
设置为 application/x-www-form-urlencoded
。
MultiValueMap<String, String> formData = ... ;
Mono<Void> result = client.post()
.uri("/path", id)
.bodyValue(formData)
.retrieve()
.bodyToMono(Void.class);
也可以使用内联API设置body
import static org.springframework.web.reactive.function.BodyInserters.*;
Mono<Void> result = client.post()
.uri("/path", id)
.body(fromFormData("k1", "v1").with("k2", "v2"))
.retrieve()
.bodyToMono(Void.class);
5.2 Multipart数据类型
需要提供 MultiValueMap<String,?>
值是表示部分内容的 Object 实例或表示部分内容和头的 HttpEntity 实例。
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("fieldPart", "fieldValue");
builder.part("filePart1", new FileSystemResource("...logo.png"));
builder.part("jsonPart", new Person("Jason"));
builder.part("myPart", part); // Part from a server request
MultiValueMap<String, HttpEntity<?>> parts = builder.build();
在大多数情况下,您不必为每个部件指定 Content-Type
。内容类型是根据序列化它所选择的 HttpMessageWriter
自动确定的,对于 Resource,则是根据文件扩展名确定的。如果有必要,可以通过一个重载的构建器部件方法显式提供 MediaType用于每个部件。
MultipartBodyBuilder builder = ...;
//
Mono<Void> result = client.post()
.uri("/path", id)
.body(builder.build())
.retrieve()
.bodyToMono(Void.class);
还可以通过内联方式
import static org.springframework.web.reactive.function.BodyInserters.*;
Mono<Void> result = client.post()
.uri("/path", id)
.body(fromMultipartData("fieldPart", "value").with("filePart", resource))
.retrieve()
.bodyToMono(Void.class);
六、客户端过滤器
通过 WebClient.Builder
注册一个客户端过滤器(ExchangeFileterFunction
) ,目的是为了拦截和修改请求。
WebClient client = WebClient.builder()
.filter((request, next) -> {
ClientRequest filtered = ClientRequest.from(request)
.header("foo", "bar")
.build();
return next.exchange(filtered);
})
.build();
比如身份验证。
过滤器应用于全局的每个请求。你可以向ClientRequest
添加请求属性,以便链路()的所有过滤器都可以访问这些属性。
WebClient client = WebClient.builder()
.filter((request, next) -> {
Optional<Object> usr = request.attribute("myAttribute");
// ...
})
.build();
client.get().uri("https://example.org/")
.attribute("myAttribute", "...")
.retrieve()
.bodyToMono(Void.class);
}
七、同步使用
Person person = client.get().uri("/person/{id}", i).retrieve()
.bodyToMono(Person.class)
.block();
List<Person> persons = client.get().uri("/persons").retrieve()
.bodyToFlux(Person.class)
.collectList()
.block();
然而,如果需要进行多个调用,避免单独阻塞每个响应更有效,而是等待组合结果:
Mono<Person> personMono = client.get().uri("/person/{id}", personId)
.retrieve().bodyToMono(Person.class);
Mono<List<Hobby>> hobbiesMono = client.get().uri("/person/{id}/hobbies", personId)
.retrieve().bodyToFlux(Hobby.class).collectList();
Map<String, Object> data = Mono.zip(personMono, hobbiesMono, (person, hobbies) -> {
Map<String, String> map = new LinkedHashMap<>();
map.put("person", person);
map.put("hobbies", hobbies);
return map;
})
.block();
以上只是一个例子,还有许多其他模式和操作符用于组合一个响应式管道,从而可以进行许多远程调用。
八、测试
可以使用模拟 Web 服务器,如 OkHttp MockWebServer,
- 导入
Maven
依赖 ```xmlcom.squareup.okhttp3 mockwebserver 3.14.9 test
2. 创建测试类
```java
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@ParameterizedTest(name = "[{index}] webClient [{0}]")
@MethodSource("arguments")
@interface ParameterizedWebClientTest {
}
static Stream<ClientHttpConnector> arguments() {
return Stream.of(
new ReactorClientHttpConnector()
);
}
private MockWebServer server;
private WebClient webClient;
private void startServer(ClientHttpConnector connector) {
this.server = new MockWebServer();
this.webClient = WebClient
.builder()
.clientConnector(connector)
.baseUrl(this.server.url("/").toString())
.build();
}
@AfterEach
void shutdown() throws IOException {
this.server.shutdown();
}
private void prepareResponse(Consumer<MockResponse> consumer) {
MockResponse response = new MockResponse();
consumer.accept(response);
this.server.enqueue(response);
}
编写测试类
@ParameterizedWebClientTest void retrieve(ClientHttpConnector connector) { startServer(connector); prepareResponse(response -> response.setHeader("Content-Type", "text/plain") .addHeader("Set-Cookie", "testkey1=testvalue1;") .addHeader("Set-Cookie", "testkey2=testvalue2; Max-Age=42; HttpOnly; Secure") .setBody("Hello Spring!")); Mono<String> result = this.webClient.get() .uri("/greeting") .cookie("testkey", "testvalue") .header("X-Test-Header", "testvalue") .retrieve() .bodyToMono(String.class); StepVerifier.create(result) .expectNext("Hello Spring!") .expectComplete() .verify(Duration.ofSeconds(3)); expectRequestCount(1); expectRequest(request -> { assertThat(request.getHeader(HttpHeaders.COOKIE)).isEqualTo("testkey=testvalue"); assertThat(request.getHeader("X-Test-Header")).isEqualTo("testvalue"); assertThat(request.getHeader(HttpHeaders.ACCEPT)).isEqualTo("*/*"); assertThat(request.getPath()).isEqualTo("/greeting"); }); }