Elasticsearch提供了2种REST客户端,一种是低级客户端,—种是高级客户端

  • Java Low Level REST Client:官方提供的低级客户端。该客户端通过http来连接 Elasticsearch集群。用户在使用该客户端时需要将请求数据手动拼接成 Elasticsearch所需JSON格式进行发送,收到响应时同样也需要将返回的JSON数据手动封装成对象。虽然麻烦,不过该客户端兼容所有的 Elasticsearch版本
  • Java High Level REST Client:官方提供的高级客户端。该客户端基于低级客户端实现,它提供了很多便捷的

API来解决低级客户端需要手动转换数据格式的问题。

低级和高级客户端创建

High-level client 会依赖 Low-level client 来执行请求, low-level client 则会维护一个请求的线程连接池,因为当 high-level 请求处理结束时,应该 close 掉这个连接,使 low-level client 能尽快释放资源。

  1. client.close();
  1. import org.apache.http.HttpHost;
  2. import org.apache.http.client.config.RequestConfig;
  3. import org.elasticsearch.client.*;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class ElasticsearchConfig {
  9. @Value("${mylasticsearch.elasticsearch.hostlist}")
  10. private String hostlist;
  11. @Bean
  12. public RestHighLevelClient restHighLevelClient() {
  13. // 解析hostlist 信息
  14. String[] split = hostlist.split(",");
  15. // 创建HttpHost数组 封装es的主机和端口
  16. HttpHost[] httpHosts = new HttpHost[split.length];
  17. for (int i = 0; i < split.length; i++) {
  18. String iterm = split[i];
  19. httpHosts[i] = new HttpHost(iterm.split(":")[0], Integer.parseInt(iterm.split(":")[1]), "http");
  20. }
  21. RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
  22. restClientBuilder.setFailureListener(new RestClient.FailureListener() {
  23. @Override
  24. public void onFailure(Node node) {
  25. super.onFailure(node);
  26. System.out.println("出错的节点: " + node);
  27. }
  28. });
  29. // 定义节点选择器 这个是跳过data=false,ingest为false的节点
  30. restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
  31. // 定义默认请求配置回调
  32. restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
  33. @Override
  34. public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
  35. return requestConfigBuilder.setConnectTimeout(90000) // 连接超时(默认为1秒)
  36. .setSocketTimeout(30000); // 套接字超时(默认为30秒)
  37. }
  38. });
  39. //设置线程数
  40. //一般线程数与本地检测到的处理器数量相同,线程数主要取决于Runtime.getRuntime().availableProcessors()返回的结果
  41. int number = 3;
  42. restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
  43. @Override
  44. public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
  45. return httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(number).build());
  46. }
  47. });
  48. return new RestHighLevelClient(restClientBuilder);
  49. }
  50. ///项目主要使用RestHighLevelClient,对于低级的客户端暂时不用
  51. // 注意执行完要把这个对象.close();
  52. @Bean
  53. public RestClient restClient() {
  54. // 解析hostlist 信息
  55. String[] split = hostlist.split(",");
  56. // 创建HttpHost数组 封装es的主机和端口
  57. HttpHost[] httpHosts = new HttpHost[split.length];
  58. for (int i = 0; i < split.length; i++) {
  59. String iterm = split[i];
  60. httpHosts[i] = new HttpHost(iterm.split(":")[0], Integer.parseInt(iterm.split(":")[1]), "http");
  61. }
  62. RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
  63. restClientBuilder.setFailureListener(new RestClient.FailureListener() {
  64. @Override
  65. public void onFailure(Node node) {
  66. super.onFailure(node);
  67. System.out.println("出错的节点: " + node);
  68. }
  69. });
  70. // 定义节点选择器 这个是跳过data=false,ingest为false的节点
  71. restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
  72. // 定义默认请求配置回调
  73. restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
  74. @Override
  75. public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
  76. return requestConfigBuilder.setConnectTimeout(90000) // 连接超时(默认为1秒)
  77. .setSocketTimeout(30000); // 套接字超时(默认为30秒)
  78. }
  79. });
  80. //设置线程数
  81. //一般线程数与本地检测到的处理器数量相同,线程数主要取决于Runtime.getRuntime().availableProcessors()返回的结果
  82. int number = 3;
  83. restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
  84. @Override
  85. public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
  86. return httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(number).build());
  87. }
  88. });
  89. return restClientBuilder.build();
  90. }
  91. }

或者

@Configuration
public class ElasticSearchClientConfig {

    @Bean
    public RestHighLevelClient restHighLevelClient(){
        return new RestHighLevelClient(
                RestClient.builder(
                    //有几个集群写几个!!
                        new HttpHost("127.0.0.1",9200,"http"),
                        new HttpHost("127.0.0.1",9100,"http")
                )
        );
    }
}

账号密码

@Configuration
public class EsClientConfiguration {
    @Value("${spring.elasticsearch.rest.uris}")
    private String[] esUris;
    @Value("${spring.elasticsearch.rest.connection-timeout}")
    private  int connectTimeOut; // 连接超时时间
    @Value("${spring.elasticsearch.rest.max-connection}")
    private  int maxConnection; //最大连接数
    @Autowired
    private Environment environment;
    @Bean
    public RestHighLevelClient client() {
        String userName = environment.getProperty("spring.elasticsearch.rest.username");
        String password = environment.getProperty("spring.elasticsearch.rest.password");
        HttpHost[] httpHosts = new HttpHost[esUris.length];
        //将地址转换为http主机数组,未配置端口则采用默认9200端口,配置了端口则用配置的端口
        for (int i = 0; i < httpHosts.length; i++) {
            if (!StringUtils.isEmpty(esUris[i])) {
                if (esUris[i].contains(":")) {
                    String[] uris = esUris[i].split(":");
                    httpHosts[i] = new HttpHost(uris[0], Integer.parseInt(uris[1]), "http");
                } else {
                    httpHosts[i] = new HttpHost(esUris[i], 9200, "http");
                }
            }
        }
        //判断,如果未配置用户名,则进行无用户名密码连接,配置了用户名,则进行用户名密码连接
        if (StringUtils.isEmpty(userName)) {
            RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(httpHosts));
            return client;
        } else {

            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                    //es账号密码
                    new UsernamePasswordCredentials(userName, password));
            RestHighLevelClient client = new RestHighLevelClient(
                    RestClient.builder(httpHosts)
                            .setHttpClientConfigCallback((httpClientBuilder) -> {
                                httpClientBuilder.setMaxConnTotal(maxConnection);
                                httpClientBuilder.disableAuthCaching();
                                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

                                return httpClientBuilder;
                            })
                            .setRequestConfigCallback(builder -> {
                                builder.setConnectTimeout(connectTimeOut);

                                return builder;
                            })
            );
            return client;
        }
    }
}

配置嗅探器

嗅探器允许自动发现运行中的Elasticsearch集群中的节点,并将其设置为现有的RestClient实例。在默认情况下,嗅探器使用nodes info API检索属于集群的节点,并使用jackson解析获得的JSON响应。目前,嗅探器与Elasticsearch 2.X及更高版本兼容。在使用嗅探器之前需添加相关的依赖,代码如下所示:
image.png
在创建好RestClient实例(如初始化中代码所示)后,就可以将嗅探器与其进行关联了。嗅探器利用RestClient提供的定期机制(在默认情况下定期时间为5min),从集群中获取当前节点的列表,并通过调用RestClient类中的setNodes方法来更新它们。嗅探器的使用代码详见ServiceImpl实现层的MeetElasticSearchServiceImpl类,部分代码如下所示:
image.png
当然,除在客户端启动时配置嗅探器外,还可以在失败时启用嗅探器。这意味着在每次失败后,节点列表都会立即更新,而不是在接下来的普通嗅探循环中更新。
在这种情况下,首先需要创建一个SniffOnFailureListener,然后在创建RestClient时配置。在创建嗅探器后,同一个SniffOnFailureListener实例会相互关联,以便在每次失败时都通知该实例,并使用嗅探器执行嗅探动作。
嗅探器SniffOnFailureListener的使用代码详见ServiceImpl实现层的MeetElasticSearchServiceImpl类,部分代码如下所示:
image.png
由于Elasticsearch节点信息API不会返回连接到节点时要使用的协议,而是只返回它们的host:port,因此在默认情况下会使用HTTP。如果需要使用HTTPS,则必须手动创建并提供ElasticSearchNodesNiffer实例,相关代码如下所示:
image.png