官方文档地址:Performing requests


一旦创建了RestClient,就可以通过调用performRequestperformRequestAsync来发送请求。performRequest是同步的,它将阻塞调用线程,并在请求成功时返回响应,或者在请求失败时抛出异常。performRequestAsync是异步的,它接受一个ResponseListener参数,当请求成功时,使用Response调用该参数,如果请求失败,则使用Exception调用该参数。

这是同步的:

  1. Request request = new Request(
  2. //HTTP 方法(GET, POST, HEAD 等)
  3. "GET",
  4. //服务器地址
  5. "/");
  6. Response response = restClient.performRequest(request);

这是异步的:

  1. Request request = new Request(
  2. //HTTP 方法(GET, POST, HEAD 等)
  3. "GET",
  4. //服务器地址
  5. "/");
  6. Cancellable cancellable = restClient.performRequestAsync(request,
  7. new ResponseListener() {
  8. @Override
  9. public void onSuccess(Response response) {
  10. //处理响应
  11. }
  12. @Override
  13. public void onFailure(Exception exception) {
  14. //处理失败
  15. }
  16. });

可以向请求对象添加请求参数:

  1. request.addParameter("pretty", "true");

可以将请求体设置为任何HttpEntity

  1. request.setEntity(new NStringEntity(
  2. "{\"json\":\"text\"}",
  3. ContentType.APPLICATION_JSON));

HttpEntity指定的ContentType很重要,因为它将用于设置Content-Type报头,以便 Elasticsearch 能够正确解析内容。

您也可以将它设置为一个String,默认ContentTypeapplication/json

  1. request.setJsonEntity("{\"json\":\"text\"}");

RequestOptions

RequestOptions类包含在同一应用程序中的多个请求之间共享的请求部分设置。 您可以创建一个单例实例并在所有请求之间共享它:

  1. private static final RequestOptions COMMON_OPTIONS;
  2. static {
  3. RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
  4. //添加所有请求所需的请求头
  5. builder.addHeader("Authorization", "Bearer " + TOKEN);
  6. //自定义响应使用者
  7. builder.setHttpAsyncResponseConsumerFactory(
  8. new HttpAsyncResponseConsumerFactory
  9. .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
  10. COMMON_OPTIONS = builder.build();
  11. }

addHeader用于授权或使用 Elasticsearch 前面的代理所需的报头。 无需设置Content-Type报头,因为客户端会自动从附加到请求的HttpEntity中设置该报头。

您可以设置NodeSelector来控制哪些节点将接收请求。 NodeSelector.SKIP_DEDICATED_MASTERS是一个不错的选择。

您还可以定制用于缓冲异步响应的响应使用者。默认使用者将在 JVM 堆上缓冲最大 100MB 的响应。如果响应较大,则请求将会失败。例如,您可以降低最大大小,如果您在上面示例这样的堆约束环境中运行,这可能会很有用。

一旦您创建了单例,就可以在发出请求时使用它:

  1. request.setOptions(COMMON_OPTIONS);

您还可以根据每个请求定制这些选项。例如,这里增加一个额外的报头:

  1. RequestOptions.Builder options = COMMON_OPTIONS.toBuilder();
  2. options.addHeader("cats", "knock things off of other things");
  3. request.setOptions(options);

多个并行异步操作

客户端很乐意并行执行多个操作。下面的示例并行地索引多个文档。在实际场景中,您可能更倾向于使用_bulk API 来实现,所以在这里本示例仅供说明。

  1. final CountDownLatch latch = new CountDownLatch(documents.length);
  2. for (int i = 0; i < documents.length; i++) {
  3. Request request = new Request("PUT", "/posts/doc/" + i);
  4. //让我们假设文档存储在 HttpEntity 数组中
  5. request.setEntity(documents[i]);
  6. restClient.performRequestAsync(
  7. request,
  8. new ResponseListener() {
  9. @Override
  10. public void onSuccess(Response response) {
  11. //处理返回的响应
  12. latch.countDown();
  13. }
  14. @Override
  15. public void onFailure(Exception exception) {
  16. //处理由于通信错误或带有指示错误的状态码的响应而返回的异常
  17. latch.countDown();
  18. }
  19. }
  20. );
  21. }
  22. latch.await();

取消异步请求

performRequestAsync方法返回一个Cancelable对象,该类有一个名为cancel的公共方法。可以通过调用该方法来取消正在进行的请求。取消请求将通过底层 http 客户端终止 http 请求。在服务器端,这个操作不会自动转换为取消请求的执行,这需要在 API 本身中具体实现。

Cancelable实例的使用是可选的,如果你不需要它,你可以忽略它。一个典型的用例是将它与 Rx Java 或 Kotlin 的suspendCancellableCoRoutine这样的框架一起使用。取消不再需要的请求是避免给 Elasticsearch 增加不必要负载的好方法。

  1. Request request = new Request("GET", "/posts/_search");
  2. Cancellable cancellable = restClient.performRequestAsync(
  3. request,
  4. new ResponseListener() {
  5. @Override
  6. public void onSuccess(Response response) {
  7. //处理在取消请求之前已经执行成功并返回的响应
  8. }
  9. @Override
  10. public void onFailure(Exception exception) {
  11. //处理返回的异常,当请求被取消时,该异常很可能是 CancellationException
  12. }
  13. }
  14. );
  15. cancellable.cancel();