参考文档:
Java High Level REST Client
引用
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.3.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.3.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.3.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
单个示例
client
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("10.20.222.191", 9200, "http")
).build()
x-pack 需要特殊验证
// 阿里云ES集群需要basic auth验证。
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("{访问用户名}", "{访问密码}"));
// 通过builder创建rest client,配置http client的HttpClientConfigCallback。
// 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址。
RestClientBuilder builder = RestClient.builder(new HttpHost("{ES集群地址}", 9200))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
// RestHighLevelClient实例通过REST low-level client builder进行构造。
RestHighLevelClient highClient = new RestHighLevelClient(builder);
create document
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("city", "北京");
jsonMap.put("state", "beijing");
jsonMap.put("location", new HashMap<String,String>(){{
put("lat", "39.1");
put("lon", "116.1");
}});
// 2、设置索引的settings,可以在参数里指定ID
IndexRequest indexRequest = new IndexRequest("city", "doc")
.source(jsonMap);
IndexResponse indexResponse = client.index(indexRequest);
》》》》》》》》》》》》》》》》》》》》》》》》》
# 处理结果
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
# 新建
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
# 更新
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
# 成功的分片个数
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
# 失败处理
}
}
根据id查询
GetRequest getRequest = new GetRequest("city", "doc", "AW7Unt433ydeaETTj2DG");
GetResponse getFields = client.get(getRequest);
Map<String, Object> sourceAsMap = getFields.getSourceAsMap();
返回:
{city=北京, location={lon=116.1, lat=39.1}, state=beijing}
根据条件查询
SearchRequest request = new SearchRequest("city");
request.types("doc");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
QueryBuilders.boolQuery();
MatchAllQueryBuilder allQuery = QueryBuilders.matchAllQuery();
sourceBuilder.query(allQuery);
request.source(sourceBuilder);
SearchResponse searchResponse = client.search(request);
SearchHits hits = searchResponse.getHits();
long totalHits = hits.getTotalHits();
float maxScore = hits.getMaxScore();
SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
System.out.println(sourceAsMap);
}
》》》》》》》》》》》》》》》》
{city=Beijing, location={lon=116.41667, lat=39.91667}, state=BJ}
{city=Shanghai, location={lon=121.43333, lat=34.50000}, state=SH}
{city=Xiamen, location={lon=118.10000, lat=24.46667}, state=FJ}
{city=Fuzhou, location={lon=119.30000, lat=26.08333}, state=FJ}
{city=Guangzhou, location={lon=113.23333, lat=23.16667}, state=GD}
{city=北京, location={lon=116.1, lat=39.1}, state=beijing}
根据ID删除
DeleteRequest deleteRequest = new DeleteRequest("city","doc","AW7Unt433ydeaETTj2DG");
DeleteResponse deleteResponse = client.delete(deleteRequest);
》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》
String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
# 处理成功分片数量小于总分片数量的情况
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
# 处理潜在的故障
}
}
根据id update
重新赋值
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc(jsonMap);
执行脚本
UpdateRequest updateRequest = new UpdateRequest("city", "doc", "AW7TsQ0W3ydeaETTj2DA");
Map<String, Object> parameters = new HashMap<String, Object>() {{
put("count", 4); // 可以是数字或者字符串
}};
Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.count += params.count", parameters);
updateRequest.script(inline);
client.update(updateRequest);
String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
# 处理文档被删除的情况
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
# 处理文档没有受到更新影响的情况(没有对文档执行任何操作(noop))
}
批量执行
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "doc", "3"));
request.add(new UpdateRequest("posts", "doc", "2")
.doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts", "doc", "4")
.source(XContentType.JSON,"field", "baz"));
BulkResponse bulkResponse = client.bulk(request);
》》》》》》》》》》》
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
scroll实例
SearchRequest searchRequest = new SearchRequest("posts");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(matchQuery("title", "Elasticsearch"));
searchSourceBuilder.size(size);
searchRequest.source(searchSourceBuilder);
searchRequest.scroll(TimeValue.timeValueMinutes(1L));
SearchResponse searchResponse = client.search(searchRequest);
String scrollId = searchResponse.getScrollId();
SearchHits hits = searchResponse.getHits();
第二次,循环知道结果为空
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(TimeValue.timeValueSeconds(30));
SearchResponse searchScrollResponse = client.searchScroll(scrollRequest);
scrollId = searchScrollResponse.getScrollId();
hits = searchScrollResponse.getHits();
assertEquals(3, hits.getTotalHits());
assertEquals(1, hits.getHits().length);
assertNotNull(scrollId);
low-client发送自定义请求
highclient不支持按条件删除,可以通过low-client来解决
封装实例
FaqRestClient.java
import com.alibaba.fastjson.JSON;
import com.unisound.model.FaqModel;
import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Repository;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Repository
public class FaqRestClient {
private String ip;
private Integer port;
private String userName;
private String password;
private String index;
private String type;
private RestHighLevelClient client;
@PostConstruct
private void init() {
// 阿里云ES集群需要basic auth验证。
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
// 通过builder创建rest client,配置http client的HttpClientConfigCallback。
// 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址。
RestClientBuilder builder = RestClient.builder(new HttpHost(this.ip, this.port))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
// RestHighLevelClient实例通过REST low-level client builder进行构造。
client = new RestHighLevelClient(builder);
}
/**
* 返回结果是否为真
*
* @param shardInfo
* @return
*/
private Boolean getSuccess(ReplicationResponse.ShardInfo shardInfo) {
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
}
return false;
}
return true;
}
/**
* 插入一个model
*
* @param faqModel
* @return
* @throws IOException
*/
public Boolean insert(FaqModel faqModel) throws IOException {
IndexRequest indexRequest = new IndexRequest(index, type)
.source(new BeanMap(faqModel));
IndexResponse indexResponse = client.index(indexRequest);
return getSuccess(indexResponse.getShardInfo());
}
/**
* 批量插入
*
* @param faqModels
* @return
* @throws IOException
*/
public Boolean inserts(List<FaqModel> faqModels) throws IOException {
BulkRequest bulkRequest = new BulkRequest();
for (FaqModel faqModel : faqModels) {
bulkRequest.add(new IndexRequest(index, type).source(new BeanMap(faqModel)));
}
BulkResponse bulkResponse = client.bulk(bulkRequest);
return !bulkResponse.hasFailures();
}
/**
* 根据id删除
*
* @param id
* @return
* @throws IOException
*/
public Boolean deleteById(String id) throws IOException {
DeleteRequest deleteRequest = new DeleteRequest(index, type, id);
DeleteResponse deleteResponse = client.delete(deleteRequest);
return getSuccess(deleteResponse.getShardInfo());
}
/**
* 批量删除
*
* @param ids
* @return
* @throws IOException
*/
public Boolean deletes(List<String> ids) throws IOException {
BulkRequest bulkRequest = new BulkRequest();
for (String id : ids) {
bulkRequest.add(new DeleteRequest(index, type, id));
}
BulkResponse bulkResponse = client.bulk(bulkRequest);
return !bulkResponse.hasFailures();
}
/**
* 查询
*
* @param sourceBuilder
* @return
* @throws IOException
*/
public List<FaqModel> query(SearchSourceBuilder sourceBuilder) throws Exception {
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.types(type);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = client.search(searchRequest);
SearchHits hits = searchResponse.getHits();
SearchHit[] searchHits = hits.getHits();
ArrayList<FaqModel> faqModels = new ArrayList<>();
for (SearchHit hit : searchHits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
FaqModel faqModel = new FaqModel();
BeanUtils.populate(faqModel, sourceAsMap);
faqModels.add(faqModel);
}
return faqModels;
}
/**
* 根据id更新
*
* @param faqModel
* @return
* @throws IOException
*/
public Boolean updateById(FaqModel faqModel) throws IOException {
UpdateRequest updateRequest = new UpdateRequest(index, type, faqModel.getId().toString())
.doc(new BeanMap(faqModel));
UpdateResponse updateResponse = client.update(updateRequest);
return getSuccess(updateResponse.getShardInfo());
}
/**
* 根据字段精确查询删除
* {
* "query":{
* "terms":{
* "key":""
* }
* }
* }
*
* @param key
* @param list
* @return
*/
public Integer deleteByQuery(String key, List<String> list) throws Exception {
String jsonStr = "{\n" +
" \"query\":{\n" +
" \"terms\":{\n" +
" \"" + key + "\": " + JSON.toJSONString(list) + "\n" +
" }\n" +
" }\n" +
"}";
RestClient lowLevelClient = client.getLowLevelClient();
HashMap<String, String> param = new HashMap<String, String>() {{
put("conflicts", "proceed");
}};
NStringEntity entity = new NStringEntity(jsonStr, ContentType.APPLICATION_JSON);
Response performResponse = lowLevelClient.performRequest("POST", "/" + index + "/" + type + "/delete_by_query", param, entity);
return JSON.parseObject(EntityUtils.toString(performResponse.getEntity())).getInteger("deleted");
}
}