参考文档:
Java High Level REST Client

引用

  1. <dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch</artifactId>
  4. <version>6.3.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.elasticsearch.client</groupId>
  8. <artifactId>transport</artifactId>
  9. <version>6.3.2</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.elasticsearch.client</groupId>
  13. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  14. <version>6.3.2</version>
  15. </dependency>
  16. <!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
  17. <dependency>
  18. <groupId>commons-beanutils</groupId>
  19. <artifactId>commons-beanutils</artifactId>
  20. <version>1.9.3</version>
  21. </dependency>

单个示例

client

  1. RestHighLevelClient client = new RestHighLevelClient(
  2. RestClient.builder(
  3. new HttpHost("10.20.222.191", 9200, "http")
  4. ).build()

x-pack 需要特殊验证

  1. // 阿里云ES集群需要basic auth验证。
  2. final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  3. //访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
  4. credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("{访问用户名}", "{访问密码}"));
  5. // 通过builder创建rest client,配置http client的HttpClientConfigCallback。
  6. // 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址。
  7. RestClientBuilder builder = RestClient.builder(new HttpHost("{ES集群地址}", 9200))
  8. .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
  9. @Override
  10. public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
  11. return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
  12. }
  13. });
  14. // RestHighLevelClient实例通过REST low-level client builder进行构造。
  15. RestHighLevelClient highClient = new RestHighLevelClient(builder);

create document

image.png

  1. Map<String, Object> jsonMap = new HashMap<>();
  2. jsonMap.put("city", "北京");
  3. jsonMap.put("state", "beijing");
  4. jsonMap.put("location", new HashMap<String,String>(){{
  5. put("lat", "39.1");
  6. put("lon", "116.1");
  7. }});
  8. // 2、设置索引的settings,可以在参数里指定ID
  9. IndexRequest indexRequest = new IndexRequest("city", "doc")
  10. .source(jsonMap);
  11. IndexResponse indexResponse = client.index(indexRequest);
  12. 》》》》》》》》》》》》》》》》》》》》》》》》》
  13. # 处理结果
  14. String index = indexResponse.getIndex();
  15. String type = indexResponse.getType();
  16. String id = indexResponse.getId();
  17. long version = indexResponse.getVersion();
  18. if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
  19. # 新建
  20. } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
  21. # 更新
  22. }
  23. ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
  24. if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
  25. # 成功的分片个数
  26. }
  27. if (shardInfo.getFailed() > 0) {
  28. for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
  29. String reason = failure.reason();
  30. # 失败处理
  31. }
  32. }

根据id查询

  1. GetRequest getRequest = new GetRequest("city", "doc", "AW7Unt433ydeaETTj2DG");
  2. GetResponse getFields = client.get(getRequest);
  3. Map<String, Object> sourceAsMap = getFields.getSourceAsMap();
  4. 返回:
  5. {city=北京, location={lon=116.1, lat=39.1}, state=beijing}

根据条件查询

  1. SearchRequest request = new SearchRequest("city");
  2. request.types("doc");
  3. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  4. QueryBuilders.boolQuery();
  5. MatchAllQueryBuilder allQuery = QueryBuilders.matchAllQuery();
  6. sourceBuilder.query(allQuery);
  7. request.source(sourceBuilder);
  8. SearchResponse searchResponse = client.search(request);
  9. SearchHits hits = searchResponse.getHits();
  10. long totalHits = hits.getTotalHits();
  11. float maxScore = hits.getMaxScore();
  12. SearchHit[] searchHits = hits.getHits();
  13. for (SearchHit hit : searchHits) {
  14. Map<String, Object> sourceAsMap = hit.getSourceAsMap();
  15. System.out.println(sourceAsMap);
  16. }
  17. 》》》》》》》》》》》》》》》》
  18. {city=Beijing, location={lon=116.41667, lat=39.91667}, state=BJ}
  19. {city=Shanghai, location={lon=121.43333, lat=34.50000}, state=SH}
  20. {city=Xiamen, location={lon=118.10000, lat=24.46667}, state=FJ}
  21. {city=Fuzhou, location={lon=119.30000, lat=26.08333}, state=FJ}
  22. {city=Guangzhou, location={lon=113.23333, lat=23.16667}, state=GD}
  23. {city=北京, location={lon=116.1, lat=39.1}, state=beijing}

根据ID删除

  1. DeleteRequest deleteRequest = new DeleteRequest("city","doc","AW7Unt433ydeaETTj2DG");
  2. DeleteResponse deleteResponse = client.delete(deleteRequest);
  3. 》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》
  4. String index = deleteResponse.getIndex();
  5. String type = deleteResponse.getType();
  6. String id = deleteResponse.getId();
  7. long version = deleteResponse.getVersion();
  8. ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
  9. if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
  10. # 处理成功分片数量小于总分片数量的情况
  11. }
  12. if (shardInfo.getFailed() > 0) {
  13. for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
  14. String reason = failure.reason();
  15. # 处理潜在的故障
  16. }
  17. }

根据id update

重新赋值

  1. Map<String, Object> jsonMap = new HashMap<>();
  2. jsonMap.put("updated", new Date());
  3. jsonMap.put("reason", "daily update");
  4. UpdateRequest request = new UpdateRequest("posts", "doc", "1")
  5. .doc(jsonMap);

执行脚本

  1. UpdateRequest updateRequest = new UpdateRequest("city", "doc", "AW7TsQ0W3ydeaETTj2DA");
  2. Map<String, Object> parameters = new HashMap<String, Object>() {{
  3. put("count", 4); // 可以是数字或者字符串
  4. }};
  5. Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.count += params.count", parameters);
  6. updateRequest.script(inline);
  7. client.update(updateRequest);
  1. String index = updateResponse.getIndex();
  2. String type = updateResponse.getType();
  3. String id = updateResponse.getId();
  4. long version = updateResponse.getVersion();
  5. if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
  6. } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
  7. } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
  8. # 处理文档被删除的情况
  9. } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
  10. # 处理文档没有受到更新影响的情况(没有对文档执行任何操作(noop))
  11. }

批量执行

  1. BulkRequest request = new BulkRequest();
  2. request.add(new DeleteRequest("posts", "doc", "3"));
  3. request.add(new UpdateRequest("posts", "doc", "2")
  4. .doc(XContentType.JSON,"other", "test"));
  5. request.add(new IndexRequest("posts", "doc", "4")
  6. .source(XContentType.JSON,"field", "baz"));
  7. BulkResponse bulkResponse = client.bulk(request);
  8. 》》》》》》》》》》》
  9. for (BulkItemResponse bulkItemResponse : bulkResponse) {
  10. DocWriteResponse itemResponse = bulkItemResponse.getResponse();
  11. if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
  12. || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
  13. IndexResponse indexResponse = (IndexResponse) itemResponse;
  14. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
  15. UpdateResponse updateResponse = (UpdateResponse) itemResponse;
  16. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
  17. DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
  18. }
  19. }

scroll实例

  1. SearchRequest searchRequest = new SearchRequest("posts");
  2. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  3. searchSourceBuilder.query(matchQuery("title", "Elasticsearch"));
  4. searchSourceBuilder.size(size);
  5. searchRequest.source(searchSourceBuilder);
  6. searchRequest.scroll(TimeValue.timeValueMinutes(1L));
  7. SearchResponse searchResponse = client.search(searchRequest);
  8. String scrollId = searchResponse.getScrollId();
  9. SearchHits hits = searchResponse.getHits();
  10. 第二次,循环知道结果为空
  11. SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
  12. scrollRequest.scroll(TimeValue.timeValueSeconds(30));
  13. SearchResponse searchScrollResponse = client.searchScroll(scrollRequest);
  14. scrollId = searchScrollResponse.getScrollId();
  15. hits = searchScrollResponse.getHits();
  16. assertEquals(3, hits.getTotalHits());
  17. assertEquals(1, hits.getHits().length);
  18. assertNotNull(scrollId);

low-client发送自定义请求

highclient不支持按条件删除,可以通过low-client来解决

封装实例

FaqRestClient.java

  1. import com.alibaba.fastjson.JSON;
  2. import com.unisound.model.FaqModel;
  3. import org.apache.commons.beanutils.BeanMap;
  4. import org.apache.commons.beanutils.BeanUtils;
  5. import org.apache.http.HttpHost;
  6. import org.apache.http.auth.AuthScope;
  7. import org.apache.http.auth.UsernamePasswordCredentials;
  8. import org.apache.http.client.CredentialsProvider;
  9. import org.apache.http.entity.ContentType;
  10. import org.apache.http.impl.client.BasicCredentialsProvider;
  11. import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
  12. import org.apache.http.nio.entity.NStringEntity;
  13. import org.apache.http.util.EntityUtils;
  14. import org.elasticsearch.action.bulk.BulkRequest;
  15. import org.elasticsearch.action.bulk.BulkResponse;
  16. import org.elasticsearch.action.delete.DeleteRequest;
  17. import org.elasticsearch.action.delete.DeleteResponse;
  18. import org.elasticsearch.action.index.IndexRequest;
  19. import org.elasticsearch.action.index.IndexResponse;
  20. import org.elasticsearch.action.search.SearchRequest;
  21. import org.elasticsearch.action.search.SearchResponse;
  22. import org.elasticsearch.action.support.replication.ReplicationResponse;
  23. import org.elasticsearch.action.update.UpdateRequest;
  24. import org.elasticsearch.action.update.UpdateResponse;
  25. import org.elasticsearch.client.Response;
  26. import org.elasticsearch.client.RestClient;
  27. import org.elasticsearch.client.RestClientBuilder;
  28. import org.elasticsearch.client.RestHighLevelClient;
  29. import org.elasticsearch.search.SearchHit;
  30. import org.elasticsearch.search.SearchHits;
  31. import org.elasticsearch.search.builder.SearchSourceBuilder;
  32. import org.springframework.stereotype.Repository;
  33. import javax.annotation.PostConstruct;
  34. import java.io.IOException;
  35. import java.util.ArrayList;
  36. import java.util.HashMap;
  37. import java.util.List;
  38. import java.util.Map;
  39. @Repository
  40. public class FaqRestClient {
  41. private String ip;
  42. private Integer port;
  43. private String userName;
  44. private String password;
  45. private String index;
  46. private String type;
  47. private RestHighLevelClient client;
  48. @PostConstruct
  49. private void init() {
  50. // 阿里云ES集群需要basic auth验证。
  51. final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  52. //访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
  53. credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
  54. // 通过builder创建rest client,配置http client的HttpClientConfigCallback。
  55. // 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址。
  56. RestClientBuilder builder = RestClient.builder(new HttpHost(this.ip, this.port))
  57. .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
  58. @Override
  59. public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
  60. return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
  61. }
  62. });
  63. // RestHighLevelClient实例通过REST low-level client builder进行构造。
  64. client = new RestHighLevelClient(builder);
  65. }
  66. /**
  67. * 返回结果是否为真
  68. *
  69. * @param shardInfo
  70. * @return
  71. */
  72. private Boolean getSuccess(ReplicationResponse.ShardInfo shardInfo) {
  73. if (shardInfo.getFailed() > 0) {
  74. for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
  75. String reason = failure.reason();
  76. }
  77. return false;
  78. }
  79. return true;
  80. }
  81. /**
  82. * 插入一个model
  83. *
  84. * @param faqModel
  85. * @return
  86. * @throws IOException
  87. */
  88. public Boolean insert(FaqModel faqModel) throws IOException {
  89. IndexRequest indexRequest = new IndexRequest(index, type)
  90. .source(new BeanMap(faqModel));
  91. IndexResponse indexResponse = client.index(indexRequest);
  92. return getSuccess(indexResponse.getShardInfo());
  93. }
  94. /**
  95. * 批量插入
  96. *
  97. * @param faqModels
  98. * @return
  99. * @throws IOException
  100. */
  101. public Boolean inserts(List<FaqModel> faqModels) throws IOException {
  102. BulkRequest bulkRequest = new BulkRequest();
  103. for (FaqModel faqModel : faqModels) {
  104. bulkRequest.add(new IndexRequest(index, type).source(new BeanMap(faqModel)));
  105. }
  106. BulkResponse bulkResponse = client.bulk(bulkRequest);
  107. return !bulkResponse.hasFailures();
  108. }
  109. /**
  110. * 根据id删除
  111. *
  112. * @param id
  113. * @return
  114. * @throws IOException
  115. */
  116. public Boolean deleteById(String id) throws IOException {
  117. DeleteRequest deleteRequest = new DeleteRequest(index, type, id);
  118. DeleteResponse deleteResponse = client.delete(deleteRequest);
  119. return getSuccess(deleteResponse.getShardInfo());
  120. }
  121. /**
  122. * 批量删除
  123. *
  124. * @param ids
  125. * @return
  126. * @throws IOException
  127. */
  128. public Boolean deletes(List<String> ids) throws IOException {
  129. BulkRequest bulkRequest = new BulkRequest();
  130. for (String id : ids) {
  131. bulkRequest.add(new DeleteRequest(index, type, id));
  132. }
  133. BulkResponse bulkResponse = client.bulk(bulkRequest);
  134. return !bulkResponse.hasFailures();
  135. }
  136. /**
  137. * 查询
  138. *
  139. * @param sourceBuilder
  140. * @return
  141. * @throws IOException
  142. */
  143. public List<FaqModel> query(SearchSourceBuilder sourceBuilder) throws Exception {
  144. SearchRequest searchRequest = new SearchRequest(index);
  145. searchRequest.types(type);
  146. searchRequest.source(sourceBuilder);
  147. SearchResponse searchResponse = client.search(searchRequest);
  148. SearchHits hits = searchResponse.getHits();
  149. SearchHit[] searchHits = hits.getHits();
  150. ArrayList<FaqModel> faqModels = new ArrayList<>();
  151. for (SearchHit hit : searchHits) {
  152. Map<String, Object> sourceAsMap = hit.getSourceAsMap();
  153. FaqModel faqModel = new FaqModel();
  154. BeanUtils.populate(faqModel, sourceAsMap);
  155. faqModels.add(faqModel);
  156. }
  157. return faqModels;
  158. }
  159. /**
  160. * 根据id更新
  161. *
  162. * @param faqModel
  163. * @return
  164. * @throws IOException
  165. */
  166. public Boolean updateById(FaqModel faqModel) throws IOException {
  167. UpdateRequest updateRequest = new UpdateRequest(index, type, faqModel.getId().toString())
  168. .doc(new BeanMap(faqModel));
  169. UpdateResponse updateResponse = client.update(updateRequest);
  170. return getSuccess(updateResponse.getShardInfo());
  171. }
  172. /**
  173. * 根据字段精确查询删除
  174. * {
  175. * "query":{
  176. * "terms":{
  177. * "key":""
  178. * }
  179. * }
  180. * }
  181. *
  182. * @param key
  183. * @param list
  184. * @return
  185. */
  186. public Integer deleteByQuery(String key, List<String> list) throws Exception {
  187. String jsonStr = "{\n" +
  188. " \"query\":{\n" +
  189. " \"terms\":{\n" +
  190. " \"" + key + "\": " + JSON.toJSONString(list) + "\n" +
  191. " }\n" +
  192. " }\n" +
  193. "}";
  194. RestClient lowLevelClient = client.getLowLevelClient();
  195. HashMap<String, String> param = new HashMap<String, String>() {{
  196. put("conflicts", "proceed");
  197. }};
  198. NStringEntity entity = new NStringEntity(jsonStr, ContentType.APPLICATION_JSON);
  199. Response performResponse = lowLevelClient.performRequest("POST", "/" + index + "/" + type + "/delete_by_query", param, entity);
  200. return JSON.parseObject(EntityUtils.toString(performResponse.getEntity())).getInteger("deleted");
  201. }
  202. }