ES官方推出Java High Level REST Client,它是基于Java Low Level REST Client的封装
Java高级别REST客户端(The Java High Level REST Client)
经过验证,此客户端可用性较高

文档查询: https://www.elastic.co/cn/search?fv-website_area=documentation&fv-product_version=7.x&q=heigh-level&size=20
具体文档: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.x/java-rest-high-compatibility.html#java-rest-high-compatibility

客户端版本与为其开发客户端的Elasticsearch版本相同。

6.x 配置

  1. @Configuration
  2. public class ElasticsearchConfiguration {
  3. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  4. @Value("${spring.elasticsearch.host}")
  5. private String host;
  6. @Value("${spring.elasticsearch.port}")
  7. private Integer port;
  8. @Value("${spring.elasticsearch.scheme}")
  9. private String scheme;
  10. @Value("${spring.elasticsearch.connection-timeout}")
  11. private Integer connectionTimeout;
  12. @Value("${spring.elasticsearch.socket-timout}")
  13. private Integer socketTimeout;
  14. @Value("${spring.elasticsearch.connection-request-timout}")
  15. private Integer connectionRequestTimout;
  16. @Value("${spring.elasticsearch.max-connection-num}")
  17. private Integer maxConnectionNum;
  18. @Value("${spring.elasticsearch.max-connection-per-route}")
  19. private Integer maxConnectionPerRoute;
  20. @Value("${spring.elasticsearch.username}")
  21. private String username;
  22. @Value("${spring.elasticsearch.password}")
  23. private String password;
  24. @Bean
  25. public RestHighLevelClient restHighLevelClient() {
  26. if (Objects.isNull(host)) {
  27. logger.error("必须指定ElasticSearch服务地址");
  28. return null;
  29. }
  30. String[] hosts = host.split(",");
  31. HttpHost[] httpHosts = new HttpHost[hosts.length];
  32. for (int i = 0; i < hosts.length; i++) {
  33. httpHosts[i] = new HttpHost(hosts[i], port, scheme);
  34. }
  35. // 构建restClientBuilder 并配置
  36. RestClientBuilder restClientBuilder = RestClient.builder(httpHosts)
  37. .setHttpClientConfigCallback(httpClientBuilder -> {
  38. httpClientBuilder.setMaxConnTotal(maxConnectionNum);
  39. httpClientBuilder.setMaxConnPerRoute(maxConnectionPerRoute);
  40. httpClientBuilder.setDefaultCredentialsProvider(this.credentialsProvider());
  41. return httpClientBuilder;
  42. })
  43. .setRequestConfigCallback(requestConfigBuilder -> {
  44. requestConfigBuilder.setSocketTimeout(socketTimeout);
  45. requestConfigBuilder.setConnectTimeout(connectionTimeout);
  46. requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimout);
  47. return requestConfigBuilder;
  48. });
  49. return new RestHighLevelClient(restClientBuilder);
  50. }
  51. private CredentialsProvider credentialsProvider() {
  52. CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  53. credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
  54. return credentialsProvider;
  55. }
  56. }

7.x 配置

Maven引用

  1. <!--查询/索引/构建等等类-->
  2. <dependency>
  3. <groupId>org.elasticsearch</groupId>
  4. <artifactId>elasticsearch</artifactId>
  5. <version>7.2.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.elasticsearch.client</groupId>
  9. <artifactId>elasticsearch-rest-client</artifactId>
  10. <version>7.2.0</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.elasticsearch.client</groupId>
  14. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  15. <version>7.2.0</version>
  16. <exclusions>
  17. <exclusion>
  18. <groupId>org.elasticsearch.client</groupId>
  19. <artifactId>elasticsearch-rest-client</artifactId>
  20. </exclusion>
  21. </exclusions>
  22. </dependency>

配置 client

  1. package com.demo.esdemo.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.http.HttpHost;
  4. import org.apache.http.auth.AuthScope;
  5. import org.apache.http.auth.UsernamePasswordCredentials;
  6. import org.apache.http.client.CredentialsProvider;
  7. import org.apache.http.impl.client.BasicCredentialsProvider;
  8. import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
  9. import org.apache.http.impl.nio.reactor.IOReactorConfig;
  10. import org.elasticsearch.client.RestClient;
  11. import org.elasticsearch.client.RestHighLevelClient;
  12. import org.springframework.beans.factory.annotation.Value;
  13. import org.springframework.context.annotation.Bean;
  14. import org.springframework.context.annotation.Configuration;
  15. import java.util.Objects;
  16. @Slf4j
  17. @Configuration
  18. public class ElasticsearchConfiguration {
  19. @Value("${spring.elasticsearch.host}")
  20. private String host;
  21. @Value("${spring.elasticsearch.port}")
  22. private Integer port;
  23. @Value("${spring.elasticsearch.scheme}")
  24. private String scheme;
  25. @Value("${spring.elasticsearch.connection-timeout}")
  26. private Integer connectionTimeout;
  27. @Value("${spring.elasticsearch.socket-timout}")
  28. private Integer socketTimeout;
  29. @Value("${spring.elasticsearch.connection-request-timout}")
  30. private Integer connectionRequestTimout;
  31. @Value("${spring.elasticsearch.username}")
  32. private String username;
  33. @Value("${spring.elasticsearch.password}")
  34. private String password;
  35. /**
  36. * @return 封装 RestClient
  37. */
  38. @Bean(destroyMethod = "close")
  39. public RestHighLevelClient restHighLevelClient() {
  40. if (Objects.isNull(host)) {
  41. log.error("必须指定ElasticSearch服务地址");
  42. return null;
  43. }
  44. String[] hosts = host.split(",");
  45. HttpHost[] httpHosts = new HttpHost[hosts.length];
  46. for (int i = 0; i < hosts.length; i++) {
  47. httpHosts[i] = new HttpHost(hosts[i], port, scheme);
  48. }
  49. return new RestHighLevelClient(RestClient.builder(httpHosts)
  50. .setRequestConfigCallback(requestConfigBuilder -> {
  51. //配置超时信息
  52. requestConfigBuilder.setConnectTimeout(connectionTimeout);
  53. requestConfigBuilder.setSocketTimeout(socketTimeout);
  54. requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimout);
  55. return requestConfigBuilder;
  56. }
  57. )
  58. .setHttpClientConfigCallback(httpClientBuilder -> {
  59. //设置线程数
  60. HttpAsyncClientBuilder httpAsyncClientBuilder = httpClientBuilder.setDefaultIOReactorConfig(
  61. IOReactorConfig.custom()
  62. .setIoThreadCount(20)
  63. .build());
  64. //设置认证信息
  65. httpAsyncClientBuilder.setDefaultCredentialsProvider(getCredentialsProvider());
  66. return httpAsyncClientBuilder;
  67. })
  68. );
  69. }
  70. /**
  71. * 鉴权
  72. *
  73. * @return 鉴权配置
  74. */
  75. private CredentialsProvider getCredentialsProvider() {
  76. CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  77. System.out.println(username);
  78. credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
  79. return credentialsProvider;
  80. }
  81. }

API使用

https://blog.csdn.net/qq_38620956/article/details/102757513

1. 创建Bool复合查询

  1. BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
  2. // 开启filter过滤
  3. // 不分词精确查询
  4. boolQuery.filter(QueryBuilders.termQuery("name", name));
  5. // range查询
  6. boolQuery.filter(QueryBuilders.rangeQuery("time").gte(startTime).lte(endTime));
  7. // 不分词 包含其中一个
  8. boolQuery.filter(QueryBuilders.termsQuery("clientId", customerNumber数组));

2. 构建查询

  1. // 生成索引
  2. List<String> indices = 索引List或数组;
  3. // 查询体
  4. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  5. // 放置bool查询 根据chatTime倒序
  6. sourceBuilder.query(boolQuery)
  7. .sort("chatTime", SortOrder.DESC)
  8. .from(offset)
  9. .size(limit);
  10. // 查询请求 将list转为数组
  11. SearchRequest searchRequest = new SearchRequest(indices.toArray(new String[0]));
  12. searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen())
  13. .types("索引类型")
  14. .source(sourceBuilder);

3. 构建聚合

  1. // 根据字段聚合
  2. AggregationBuilder dialogCount = AggregationBuilders
  3. .terms("聚合名称")
  4. .field("id")
  5. .size(300000);
  6. // 先 filter 再聚合
  7. AggregationBuilder interactionDialogCount =
  8. AggregationBuilders
  9. .filter(INTERACTION_DIALOG_COUNT, QueryBuilders.rangeQuery(USER_MESSAGE_COUNT).gt(0));
  10. // 聚合用户消息数
  11. AggregationBuilder userMessageCount =
  12. AggregationBuilders.sum("聚合名称").field("id");

4. 使用client请求

  1. try {
  2. SearchResponse response =
  3. restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  4. SearchHits hits = response.getHits();
  5. SearchHit[] searchHits = hits.getHits();
  6. // 查询结果为空时,返回空的pageData
  7. if (hits.totalHits == 0) {
  8. return pageData;
  9. }
  10. // 遍历查询结果
  11. List<Dialog> results = new ArrayList<>();
  12. for (SearchHit hit : searchHits) {
  13. // 搞成对象
  14. Object dialog = JsonUtils.parseObject(hit.getSourceAsString(), Object.class);
  15. results.add(dialog);
  16. }
  17. } catch (IOException e) {
  18. log.error("解析发生错误", e);
  19. }

查询

  1. 主键id查询 ```java SearchSourceBuilder builder = new SearchSourceBuilder(); builder.query(QueryBuilders.termsQuery(“_id”, id));

SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(indexName); searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); searchRequest.source(builder);

logger.info(builder.toString()); KbArticle result = null; try { SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = response.getHits(); SearchHit[] searchHits = hits.getHits(); if (hits.getTotalHits().value == 0) { return result; } // 遍历查询结果 for (SearchHit hit : searchHits) { KbArticle article = objectMapper.readValue(hit.getSourceAsString(), KbArticle.class); article.setId(hit.getId()); result = article; } }

  1. <a name="sI3Vn"></a>
  2. ## 5. 插入
  3. ```java
  4. public Integer documentInsert(String indexName, Map<String, Object> data) {
  5. IndexRequest indexRequest = new IndexRequest(indexName, indexName);
  6. indexRequest.source(JSON.toJSONString(data), XContentType.JSON);
  7. try {
  8. rhlClient.index(indexRequest, RequestOptions.DEFAULT);
  9. } catch (IOException e) {
  10. e.printStackTrace();
  11. }
  12. return 1;
  13. }

6. 更新

  1. public Integer documentUpdate(String indexName, Map<String, Object> data, String id) {
  2. UpdateRequest updateRequest = new UpdateRequest(indexName, indexName, id);
  3. updateRequest.doc(data);
  4. // updateRequest.doc(source, XContentType.JSON);
  5. try {
  6. UpdateResponse update = rhlClient.update(updateRequest, RequestOptions.DEFAULT);
  7. if (update.status() != RestStatus.OK) {
  8. throw new IOException();
  9. }
  10. logger.info("[修改ES];[请求]:{} ; [结果]:{}",
  11. updateRequest.toString(), update.toString());
  12. } catch (IOException e) {
  13. e.printStackTrace();
  14. }
  15. return 1;
  16. }

7. 根据主键删除/ 根据条件删除

  1. public Integer documentDelete(String indexName, String id) {
  2. SysTable table = tableMapper.select(tableId);
  3. DeleteRequest deleteRequest = new DeleteRequest(indexName, type, id);
  4. try {
  5. rhlClient.delete(deleteRequest, RequestOptions.DEFAULT);
  6. } catch (IOException e) {
  7. e.printStackTrace();
  8. }
  9. return 1;
  10. }

批量条件删除

  1. BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
  2. // 添加条件筛选
  3. boolQuery.filter(QueryBuilders.termQuery("enterpriseId", enterpriseId));
  4. String indexName = EsConst.INDEX_NAME;
  5. DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
  6. deleteByQueryRequest.setConflicts("proceed");
  7. deleteByQueryRequest.setQuery(boolQuery);
  8. deleteByQueryRequest.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
  9. try {
  10. BulkByScrollResponse bulkResponse = restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
  11. logger.info("[知识库_文档库]-[批量删除ES] ; [结果]:{}", bulkResponse.toString());
  12. } catch (IOException e) {
  13. logger.error("bulkDeleteArticleError ", e);
  14. }

8. 批量插入/删除

  1. public Integer documentBatchInsert(String indenName, List<Map<String, Object>> data) {
  2. BulkRequest bulkRequest = new BulkRequest();
  3. data.forEach(datum -> {
  4. IndexRequest indexRequest = new IndexRequest(indenName, type);
  5. indexRequest.source(JSON.toJSONString(datum), XContentType.JSON);
  6. bulkRequest.add(indexRequest);
  7. });
  8. try {
  9. rhlClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  10. } catch (IOException e) {
  11. e.printStackTrace();
  12. }
  13. return data.size();
  14. }
  1. BulkRequest bulkRequest = new BulkRequest();
  2. String indexName = "";
  3. DeleteRequest deleteRequest = new DeleteRequest().index(indexName).id(id);
  4. bulkRequest.add(deleteRequest);
  5. logger.info(deleteRequest.toString());
  6. BulkResponse bulkResponse = null;
  7. try {
  8. bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  9. }