参考文档:
Java High Level REST Client
引用
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>6.3.2</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>6.3.2</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>6.3.2</version></dependency><!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils --><dependency><groupId>commons-beanutils</groupId><artifactId>commons-beanutils</artifactId><version>1.9.3</version></dependency>
单个示例
client
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("10.20.222.191", 9200, "http")).build()
x-pack 需要特殊验证
// 阿里云ES集群需要basic auth验证。final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();//访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("{访问用户名}", "{访问密码}"));// 通过builder创建rest client,配置http client的HttpClientConfigCallback。// 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址。RestClientBuilder builder = RestClient.builder(new HttpHost("{ES集群地址}", 9200)).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);}});// RestHighLevelClient实例通过REST low-level client builder进行构造。RestHighLevelClient highClient = new RestHighLevelClient(builder);
create document

Map<String, Object> jsonMap = new HashMap<>();jsonMap.put("city", "北京");jsonMap.put("state", "beijing");jsonMap.put("location", new HashMap<String,String>(){{put("lat", "39.1");put("lon", "116.1");}});// 2、设置索引的settings,可以在参数里指定IDIndexRequest indexRequest = new IndexRequest("city", "doc").source(jsonMap);IndexResponse indexResponse = client.index(indexRequest);》》》》》》》》》》》》》》》》》》》》》》》》》# 处理结果String index = indexResponse.getIndex();String type = indexResponse.getType();String id = indexResponse.getId();long version = indexResponse.getVersion();if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {# 新建} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {# 更新}ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();if (shardInfo.getTotal() != shardInfo.getSuccessful()) {# 成功的分片个数}if (shardInfo.getFailed() > 0) {for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {String reason = failure.reason();# 失败处理}}
根据id查询
GetRequest getRequest = new GetRequest("city", "doc", "AW7Unt433ydeaETTj2DG");GetResponse getFields = client.get(getRequest);Map<String, Object> sourceAsMap = getFields.getSourceAsMap();返回:{city=北京, location={lon=116.1, lat=39.1}, state=beijing}
根据条件查询
SearchRequest request = new SearchRequest("city");request.types("doc");SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();QueryBuilders.boolQuery();MatchAllQueryBuilder allQuery = QueryBuilders.matchAllQuery();sourceBuilder.query(allQuery);request.source(sourceBuilder);SearchResponse searchResponse = client.search(request);SearchHits hits = searchResponse.getHits();long totalHits = hits.getTotalHits();float maxScore = hits.getMaxScore();SearchHit[] searchHits = hits.getHits();for (SearchHit hit : searchHits) {Map<String, Object> sourceAsMap = hit.getSourceAsMap();System.out.println(sourceAsMap);}》》》》》》》》》》》》》》》》{city=Beijing, location={lon=116.41667, lat=39.91667}, state=BJ}{city=Shanghai, location={lon=121.43333, lat=34.50000}, state=SH}{city=Xiamen, location={lon=118.10000, lat=24.46667}, state=FJ}{city=Fuzhou, location={lon=119.30000, lat=26.08333}, state=FJ}{city=Guangzhou, location={lon=113.23333, lat=23.16667}, state=GD}{city=北京, location={lon=116.1, lat=39.1}, state=beijing}
根据ID删除
DeleteRequest deleteRequest = new DeleteRequest("city","doc","AW7Unt433ydeaETTj2DG");DeleteResponse deleteResponse = client.delete(deleteRequest);》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》String index = deleteResponse.getIndex();String type = deleteResponse.getType();String id = deleteResponse.getId();long version = deleteResponse.getVersion();ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();if (shardInfo.getTotal() != shardInfo.getSuccessful()) {# 处理成功分片数量小于总分片数量的情况}if (shardInfo.getFailed() > 0) {for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {String reason = failure.reason();# 处理潜在的故障}}
根据id update
重新赋值
Map<String, Object> jsonMap = new HashMap<>();jsonMap.put("updated", new Date());jsonMap.put("reason", "daily update");UpdateRequest request = new UpdateRequest("posts", "doc", "1").doc(jsonMap);
执行脚本
UpdateRequest updateRequest = new UpdateRequest("city", "doc", "AW7TsQ0W3ydeaETTj2DA");Map<String, Object> parameters = new HashMap<String, Object>() {{put("count", 4); // 可以是数字或者字符串}};Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.count += params.count", parameters);updateRequest.script(inline);client.update(updateRequest);
String index = updateResponse.getIndex();String type = updateResponse.getType();String id = updateResponse.getId();long version = updateResponse.getVersion();if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {# 处理文档被删除的情况} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {# 处理文档没有受到更新影响的情况(没有对文档执行任何操作(noop))}
批量执行
BulkRequest request = new BulkRequest();request.add(new DeleteRequest("posts", "doc", "3"));request.add(new UpdateRequest("posts", "doc", "2").doc(XContentType.JSON,"other", "test"));request.add(new IndexRequest("posts", "doc", "4").source(XContentType.JSON,"field", "baz"));BulkResponse bulkResponse = client.bulk(request);》》》》》》》》》》》for (BulkItemResponse bulkItemResponse : bulkResponse) {DocWriteResponse itemResponse = bulkItemResponse.getResponse();if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {IndexResponse indexResponse = (IndexResponse) itemResponse;} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {UpdateResponse updateResponse = (UpdateResponse) itemResponse;} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {DeleteResponse deleteResponse = (DeleteResponse) itemResponse;}}
scroll实例
SearchRequest searchRequest = new SearchRequest("posts");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.query(matchQuery("title", "Elasticsearch"));searchSourceBuilder.size(size);searchRequest.source(searchSourceBuilder);searchRequest.scroll(TimeValue.timeValueMinutes(1L));SearchResponse searchResponse = client.search(searchRequest);String scrollId = searchResponse.getScrollId();SearchHits hits = searchResponse.getHits();第二次,循环知道结果为空SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);scrollRequest.scroll(TimeValue.timeValueSeconds(30));SearchResponse searchScrollResponse = client.searchScroll(scrollRequest);scrollId = searchScrollResponse.getScrollId();hits = searchScrollResponse.getHits();assertEquals(3, hits.getTotalHits());assertEquals(1, hits.getHits().length);assertNotNull(scrollId);
low-client发送自定义请求
highclient不支持按条件删除,可以通过low-client来解决
封装实例
FaqRestClient.java
import com.alibaba.fastjson.JSON;import com.unisound.model.FaqModel;import org.apache.commons.beanutils.BeanMap;import org.apache.commons.beanutils.BeanUtils;import org.apache.http.HttpHost;import org.apache.http.auth.AuthScope;import org.apache.http.auth.UsernamePasswordCredentials;import org.apache.http.client.CredentialsProvider;import org.apache.http.entity.ContentType;import org.apache.http.impl.client.BasicCredentialsProvider;import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;import org.apache.http.nio.entity.NStringEntity;import org.apache.http.util.EntityUtils;import org.elasticsearch.action.bulk.BulkRequest;import org.elasticsearch.action.bulk.BulkResponse;import org.elasticsearch.action.delete.DeleteRequest;import org.elasticsearch.action.delete.DeleteResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.action.support.replication.ReplicationResponse;import org.elasticsearch.action.update.UpdateRequest;import org.elasticsearch.action.update.UpdateResponse;import org.elasticsearch.client.Response;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestClientBuilder;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import org.elasticsearch.search.builder.SearchSourceBuilder;import org.springframework.stereotype.Repository;import javax.annotation.PostConstruct;import java.io.IOException;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;@Repositorypublic class FaqRestClient {private String ip;private Integer port;private String userName;private String password;private String index;private String type;private RestHighLevelClient client;@PostConstructprivate void init() {// 阿里云ES集群需要basic auth验证。final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();//访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));// 通过builder创建rest client,配置http client的HttpClientConfigCallback。// 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址。RestClientBuilder builder = RestClient.builder(new HttpHost(this.ip, this.port)).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);}});// RestHighLevelClient实例通过REST low-level client builder进行构造。client = new RestHighLevelClient(builder);}/*** 返回结果是否为真** @param shardInfo* @return*/private Boolean getSuccess(ReplicationResponse.ShardInfo shardInfo) {if (shardInfo.getFailed() > 0) {for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {String reason = failure.reason();}return false;}return true;}/*** 插入一个model** @param faqModel* @return* @throws IOException*/public Boolean insert(FaqModel faqModel) throws IOException {IndexRequest indexRequest = new IndexRequest(index, type).source(new BeanMap(faqModel));IndexResponse indexResponse = client.index(indexRequest);return getSuccess(indexResponse.getShardInfo());}/*** 批量插入** @param faqModels* @return* @throws IOException*/public Boolean inserts(List<FaqModel> faqModels) throws IOException {BulkRequest bulkRequest = new BulkRequest();for (FaqModel faqModel : faqModels) {bulkRequest.add(new IndexRequest(index, type).source(new BeanMap(faqModel)));}BulkResponse bulkResponse = client.bulk(bulkRequest);return !bulkResponse.hasFailures();}/*** 根据id删除** @param id* @return* @throws IOException*/public Boolean deleteById(String id) throws IOException {DeleteRequest deleteRequest = new DeleteRequest(index, type, id);DeleteResponse deleteResponse = client.delete(deleteRequest);return getSuccess(deleteResponse.getShardInfo());}/*** 批量删除** @param ids* @return* @throws IOException*/public Boolean deletes(List<String> ids) throws IOException {BulkRequest bulkRequest = new BulkRequest();for (String id : ids) {bulkRequest.add(new DeleteRequest(index, type, id));}BulkResponse bulkResponse = client.bulk(bulkRequest);return !bulkResponse.hasFailures();}/*** 查询** @param sourceBuilder* @return* @throws IOException*/public List<FaqModel> query(SearchSourceBuilder sourceBuilder) throws Exception {SearchRequest searchRequest = new SearchRequest(index);searchRequest.types(type);searchRequest.source(sourceBuilder);SearchResponse searchResponse = client.search(searchRequest);SearchHits hits = searchResponse.getHits();SearchHit[] searchHits = hits.getHits();ArrayList<FaqModel> faqModels = new ArrayList<>();for (SearchHit hit : searchHits) {Map<String, Object> sourceAsMap = hit.getSourceAsMap();FaqModel faqModel = new FaqModel();BeanUtils.populate(faqModel, sourceAsMap);faqModels.add(faqModel);}return faqModels;}/*** 根据id更新** @param faqModel* @return* @throws IOException*/public Boolean updateById(FaqModel faqModel) throws IOException {UpdateRequest updateRequest = new UpdateRequest(index, type, faqModel.getId().toString()).doc(new BeanMap(faqModel));UpdateResponse updateResponse = client.update(updateRequest);return getSuccess(updateResponse.getShardInfo());}/*** 根据字段精确查询删除* {* "query":{* "terms":{* "key":""* }* }* }** @param key* @param list* @return*/public Integer deleteByQuery(String key, List<String> list) throws Exception {String jsonStr = "{\n" +" \"query\":{\n" +" \"terms\":{\n" +" \"" + key + "\": " + JSON.toJSONString(list) + "\n" +" }\n" +" }\n" +"}";RestClient lowLevelClient = client.getLowLevelClient();HashMap<String, String> param = new HashMap<String, String>() {{put("conflicts", "proceed");}};NStringEntity entity = new NStringEntity(jsonStr, ContentType.APPLICATION_JSON);Response performResponse = lowLevelClient.performRequest("POST", "/" + index + "/" + type + "/delete_by_query", param, entity);return JSON.parseObject(EntityUtils.toString(performResponse.getEntity())).getInteger("deleted");}}
