Elasticsearch提供了2种REST客户端,一种是低级客户端,—种是高级客户端
- Java Low Level REST Client:官方提供的低级客户端。该客户端通过http来连接 Elasticsearch集群。用户在使用该客户端时需要将请求数据手动拼接成 Elasticsearch所需JSON格式进行发送,收到响应时同样也需要将返回的JSON数据手动封装成对象。虽然麻烦,不过该客户端兼容所有的 Elasticsearch版本
- Java High Level REST Client:官方提供的高级客户端。该客户端基于低级客户端实现,它提供了很多便捷的
低级和高级客户端创建
High-level client 会依赖 Low-level client 来执行请求, low-level client 则会维护一个请求的线程连接池,因为当 high-level 请求处理结束时,应该 close 掉这个连接,使 low-level client 能尽快释放资源。
client.close();
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.elasticsearch.client.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchConfig {
@Value("${mylasticsearch.elasticsearch.hostlist}")
private String hostlist;
@Bean
public RestHighLevelClient restHighLevelClient() {
// 解析hostlist 信息
String[] split = hostlist.split(",");
// 创建HttpHost数组 封装es的主机和端口
HttpHost[] httpHosts = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
String iterm = split[i];
httpHosts[i] = new HttpHost(iterm.split(":")[0], Integer.parseInt(iterm.split(":")[1]), "http");
}
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
restClientBuilder.setFailureListener(new RestClient.FailureListener() {
@Override
public void onFailure(Node node) {
super.onFailure(node);
System.out.println("出错的节点: " + node);
}
});
// 定义节点选择器 这个是跳过data=false,ingest为false的节点
restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
// 定义默认请求配置回调
restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder.setConnectTimeout(90000) // 连接超时(默认为1秒)
.setSocketTimeout(30000); // 套接字超时(默认为30秒)
}
});
//设置线程数
//一般线程数与本地检测到的处理器数量相同,线程数主要取决于Runtime.getRuntime().availableProcessors()返回的结果
int number = 3;
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
return httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(number).build());
}
});
return new RestHighLevelClient(restClientBuilder);
}
///项目主要使用RestHighLevelClient,对于低级的客户端暂时不用
// 注意执行完要把这个对象.close();
@Bean
public RestClient restClient() {
// 解析hostlist 信息
String[] split = hostlist.split(",");
// 创建HttpHost数组 封装es的主机和端口
HttpHost[] httpHosts = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
String iterm = split[i];
httpHosts[i] = new HttpHost(iterm.split(":")[0], Integer.parseInt(iterm.split(":")[1]), "http");
}
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
restClientBuilder.setFailureListener(new RestClient.FailureListener() {
@Override
public void onFailure(Node node) {
super.onFailure(node);
System.out.println("出错的节点: " + node);
}
});
// 定义节点选择器 这个是跳过data=false,ingest为false的节点
restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
// 定义默认请求配置回调
restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder.setConnectTimeout(90000) // 连接超时(默认为1秒)
.setSocketTimeout(30000); // 套接字超时(默认为30秒)
}
});
//设置线程数
//一般线程数与本地检测到的处理器数量相同,线程数主要取决于Runtime.getRuntime().availableProcessors()返回的结果
int number = 3;
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
return httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(number).build());
}
});
return restClientBuilder.build();
}
}
或者
@Configuration
public class ElasticSearchClientConfig {
@Bean
public RestHighLevelClient restHighLevelClient(){
return new RestHighLevelClient(
RestClient.builder(
//有几个集群写几个!!
new HttpHost("127.0.0.1",9200,"http"),
new HttpHost("127.0.0.1",9100,"http")
)
);
}
}
账号密码
@Configuration
public class EsClientConfiguration {
@Value("${spring.elasticsearch.rest.uris}")
private String[] esUris;
@Value("${spring.elasticsearch.rest.connection-timeout}")
private int connectTimeOut; // 连接超时时间
@Value("${spring.elasticsearch.rest.max-connection}")
private int maxConnection; //最大连接数
@Autowired
private Environment environment;
@Bean
public RestHighLevelClient client() {
String userName = environment.getProperty("spring.elasticsearch.rest.username");
String password = environment.getProperty("spring.elasticsearch.rest.password");
HttpHost[] httpHosts = new HttpHost[esUris.length];
//将地址转换为http主机数组,未配置端口则采用默认9200端口,配置了端口则用配置的端口
for (int i = 0; i < httpHosts.length; i++) {
if (!StringUtils.isEmpty(esUris[i])) {
if (esUris[i].contains(":")) {
String[] uris = esUris[i].split(":");
httpHosts[i] = new HttpHost(uris[0], Integer.parseInt(uris[1]), "http");
} else {
httpHosts[i] = new HttpHost(esUris[i], 9200, "http");
}
}
}
//判断,如果未配置用户名,则进行无用户名密码连接,配置了用户名,则进行用户名密码连接
if (StringUtils.isEmpty(userName)) {
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(httpHosts));
return client;
} else {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
//es账号密码
new UsernamePasswordCredentials(userName, password));
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(httpHosts)
.setHttpClientConfigCallback((httpClientBuilder) -> {
httpClientBuilder.setMaxConnTotal(maxConnection);
httpClientBuilder.disableAuthCaching();
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder;
})
.setRequestConfigCallback(builder -> {
builder.setConnectTimeout(connectTimeOut);
return builder;
})
);
return client;
}
}
}
配置嗅探器
嗅探器允许自动发现运行中的Elasticsearch集群中的节点,并将其设置为现有的RestClient实例。在默认情况下,嗅探器使用nodes info API检索属于集群的节点,并使用jackson解析获得的JSON响应。目前,嗅探器与Elasticsearch 2.X及更高版本兼容。在使用嗅探器之前需添加相关的依赖,代码如下所示:
在创建好RestClient实例(如初始化中代码所示)后,就可以将嗅探器与其进行关联了。嗅探器利用RestClient提供的定期机制(在默认情况下定期时间为5min),从集群中获取当前节点的列表,并通过调用RestClient类中的setNodes方法来更新它们。嗅探器的使用代码详见ServiceImpl实现层的MeetElasticSearchServiceImpl类,部分代码如下所示:
当然,除在客户端启动时配置嗅探器外,还可以在失败时启用嗅探器。这意味着在每次失败后,节点列表都会立即更新,而不是在接下来的普通嗅探循环中更新。
在这种情况下,首先需要创建一个SniffOnFailureListener,然后在创建RestClient时配置。在创建嗅探器后,同一个SniffOnFailureListener实例会相互关联,以便在每次失败时都通知该实例,并使用嗅探器执行嗅探动作。
嗅探器SniffOnFailureListener的使用代码详见ServiceImpl实现层的MeetElasticSearchServiceImpl类,部分代码如下所示:
由于Elasticsearch节点信息API不会返回连接到节点时要使用的协议,而是只返回它们的host:port,因此在默认情况下会使用HTTP。如果需要使用HTTPS,则必须手动创建并提供ElasticSearchNodesNiffer实例,相关代码如下所示: