简介
官方文档
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html
csdn翻译的
https://blog.csdn.net/hbtj_1216/article/details/84880057
刷新策略
RefreshPolicy
可知有以下三种刷新策略:
- RefreshPolicy#IMMEDIATE:
请求向ElasticSearch提交了数据,立即进行数据刷新,然后再结束请求。
优点:实时性高、操作延时短。
缺点:资源消耗高。
- RefreshPolicy#WAIT_UNTIL:
请求向ElasticSearch提交了数据,等待数据完成刷新,然后再结束请求。
优点:实时性高、操作延时长。
缺点:资源消耗低。
- RefreshPolicy#NONE:
默认策略。
请求向ElasticSearch提交了数据,不关系数据是否已经完成刷新,直接结束请求。
优点:操作延时短、资源消耗低。
缺点:实时性低。
实现此接口的主要类如下:
- DeleteRequestBuilder
- IndexRequestBuilder
- UpdateRequestBuilder
BulkRequestBuilder
DeleteRequest request = new DeleteRequest("demo_index", "1");
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
添加文档
//添加文档
/*
POST /elasticsearch_test/_doc/1
{
"name": "spring cloud实战",
"description": "本课程主要从四个章节进行讲解: 1.微服务架构入门 2.spring cloud 基础入门 3.实战Spring Boot 4.注册中心eureka。",
"studymodel":"201001",
"timestamp": "2020-08-22 20:09:18",
"price": 5.6
}
*/
同步请求
进行同步请求:使用client.index(indexRequest, RequestOptions.DEFAULT);方法 ```java @Test public void testAddDoc() throws IOException { IndexRequest indexRequest = new IndexRequest(“elasticsearch_test”);
/** 如果操作设置为DocWriteRequest.OpType.INDEX(默认值),如果文档存在,则更新文档;如果文档不存在,则创建文档 如果操作设置为DocWriteRequest.OpType.CREATE,则是指定为创建文档操作,如果对象的文档(根据id判断)存在,则报错如下:
ElasticsearchStatusException[Elasticsearch exception
[type=version_conflict_engine_exception,
reason=[doc][16]: version conflict, document already exists
索引操作只能为以上两种操作值,不能为UPDATE和DELETE */ indexRequest.timeout(TimeValue.timeValueSeconds(5)); indexRequest.opType(DocWriteRequest.OpType.INDEX);
//indexRequest.id(“2”); // 文档内容 准备json数据 Map
jsonMap = new HashMap<>(); jsonMap.put(“name”, “spring cloud实战3”); jsonMap.put(“description”, “本课程主要从四个章节进行讲解3: 1.微服务架构入门 2.spring cloud 基础入门 3.实战Spring Boot 4.注册中心eureka。”); jsonMap.put(“studymodel”, “3101001”); jsonMap.put(“timestamp”, “2020-07-22 20:09:18”); jsonMap.put(“price”, 35.6); indexRequest.source(jsonMap); // 执行请求 try {
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
DocWriteResponse.Result result = indexResponse.getResult();
System.out.println(result);
String index = indexResponse.getIndex();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("添加成功");
System.out.println("id:" + id);
System.out.println("version:" + version);
System.out.println("index:" + index);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("更新成功");
System.out.println("index:" + index);
System.out.println("id:" + id);
System.out.println("version:" + version);
}
}catch (ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
System.out.println("创建的文档与已存在的发生冲突");
}
}
client.close();
}
<a name="M6iyl"></a>
## 异步请求
创建异步请求的,回调对象:ActionListener<IndexResponse><br />如果执行成功,会自动调用onResponse方法,如果执行失败,会回调onFailure方法<br />可以从传入的IndexResponse和Exception类型参数中获取相关创建情况信息
```java
@Test
public void myTest() throws Exception {
IndexRequest indexRequest = new IndexRequest("elasticsearch_test");
indexRequest.timeout(TimeValue.timeValueSeconds(5));
indexRequest.opType(DocWriteRequest.OpType.INDEX);
//indexRequest.id("2");
// 文档内容 XContentBuilder,elasticsearch内容助手会根据该对象自动生成json格式内容进行保存
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("name", "spring cloud实战1");
builder.timeField("timestamp", new Date());
builder.field("description", "本课程主要从四个章节进行讲解3: 1.微服务架构入门 2.spring cloud 基础入门 3.实战Spring Boot 4.注册中心eureka。");
builder.field("studymodel", "3101001");
builder.field("price", 35.6);
}
builder.endObject();
indexRequest.source(builder);
// 执行请求
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
String index = indexResponse.getIndex();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
DocWriteResponse.Result result = indexResponse.getResult();
System.out.println(result);
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("添加成功");
System.out.println("index:" + index);
System.out.println("id:" + id);
System.out.println("version:" + version);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("更新成功");
System.out.println("index:" + index);
System.out.println("id:" + id);
System.out.println("version:" + version);
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
}
}
}
@Override
public void onFailure(Exception e) {
e.printStackTrace();
}
};
//进行异步请求:将请求对象、和回调对象作为参数传入
client.indexAsync(indexRequest, RequestOptions.DEFAULT, listener);
TimeUnit.SECONDS.sleep(10);
//不被注释掉,可能还没有将请求发送出去,连接就会被关闭,从而创建或更新失败
//client.close();
}
查询文档
为请求对象设置参数:
- 设置查询的版本
getRequest.version(2) 可以不设置,如果设置所要查询的文档版本号为2,如果当前版本号为4,则查询失败,报错如下
设置查询的参数
查询文档中指定的字段(如message和以Date结尾的字段): String[] includes = new String[]{“message”, “*Date”}查询文档中所有的字段:String[] includes = Strings.EMPTY_ARRAY;
过滤掉文档中指定的字段(如message): String[] excludes = new String[]{“message”};
不过滤文档中任何字段:String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
其中includes可以传null,此时表示查询所有的字段
excludes也可以为null,此时表示不过滤掉任何字段
注: 如果includes和excludes包含同一个字段,则excludes起作用,也就是查询的字段会被过滤掉
将设置的查询参数赋值给请求对象
getRequest.fetchSourceContext(fetchSourceContext);
如果不希望返回结果中包含内容,可以进行如下设置
getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
默认是返回全部内容的
@Test
public void testGetDoc() throws IOException {
// 查询请求对象
GetRequest getRequest = new GetRequest("elasticsearch_test", "12");
//可以不设置,如果设置所要查询的文档版本号为2,如果当前版本号为4,则查询失败
// getRequest.version(1);
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, new String[]{"name", "*price", "timestamp"}, Strings.EMPTY_ARRAY);
getRequest.fetchSourceContext(fetchSourceContext);
try {
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
System.out.println("Index: " + getResponse.getIndex() + "; Id: " + getResponse.getId());
if (getResponse.isExists()) {
System.out.println("version: " + getResponse.getVersion() + "; 内容字符串: " + getResponse.getSourceAsString());//打印文档的内容
System.out.println(getResponse);//返回的全部内容和命令式一样的
// 得到文档内容
Map<String, Object> sourceMap = getResponse.getSourceAsMap();
// byte[] sourceAsBytes = getResponse.getSourceAsBytes();
System.out.println(sourceMap);
} else {
System.out.println("查询的文档不存在!");
}
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
System.out.println("所查询的文档库不存在!");
}else if(e.status() == RestStatus.CONFLICT) {
System.out.println("指定version与当前版本冲突");
}
}
}
全部记录
/*
GET /elasticsearch_test/_search
{
"query":{
"match_all":{}
}
}
*/
@Test
public void testSearchAll() throws IOException {
// 搜索请求对象
SearchRequest searchRequest = new SearchRequest("elasticsearch_test");
searchRequest.searchType(SearchType.QUERY_THEN_FETCH);
// 搜索源构建对象
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 设置搜索方法
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
// 设置查询出那些字段, 不查出那些字段
searchSourceBuilder.fetchSource(new String[]{"name", "price", "timestamp"}, new String[]{});
// 请求对象设置 搜索源对象
searchRequest.source(searchSourceBuilder);
// 使用client 执行搜索
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
// 搜索结果
SearchHits hits = searchResponse.getHits();
// 匹配到的总记录数
TotalHits totalHits = hits.getTotalHits();
System.out.println("查询到的总记录数:" + totalHits.value);
// 得到的匹配度高的文档
SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
String id = hit.getId();
// 源文档的内容
Map<String, Object> sourceMap = hit.getSourceAsMap();
String name = (String) sourceMap.get("name");
String timestamp = (String) sourceMap.get("timestamp");
String description = (String) sourceMap.get("description");
Double price = (Double) sourceMap.get("price");
System.out.println(name);
System.out.println(timestamp);
System.out.println(description);
System.out.println(price);
}
}
更新文档
以下情况
- 如果将request.docAsUpsert(true) 和request.scriptedUpsert(true)注释掉或都设置为false,
- 如果将request.docAsUpsert(true) 设置为faluse,而request.scriptedUpsert(true)为true
且文档不存在,则创建出来的文档内容为:
{“created”:”2017-01-01”},
即只有request.upsert(jsonString, XContentType.JSON)中的jsonString内容被创建,而request.doc(jsonMap)中的jsonMap内容没被创建
此例中如果文档不存在,且这样设置:request.scriptedUpsert(true);、request.docAsUpsert(false);,则会创建一个空内容的文档,因为脚本中没有内容,而禁止通过doc创建新文档
如果不使用request.upsert方法,且request.scriptedUpsert(false);和request.docAsUpsert(false);都设置为false,则文档不存在时提示没有找到文档,而不会创建新的文档
如果request.docAsUpsert(true)和request.scriptedUpsert(true)都设置为true,且
request.doc(jsonMap)被注释掉时,会报错如下: org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: script or doc is missing;2: doc must be specified if doc_as_upsert is enabled;
即如果开启动了doc_as_upsert方法,则必须使用doc方法传入需要更新的内容
- 注:单机不要使用如下方法,否则会报超时异常
// request.waitForActiveShards(2); // request.waitForActiveShards(ActiveShardCount.ALL);
//创建请求对象
//索引名称,id
UpdateRequest updateRequest = new UpdateRequest("elasticsearch_test", "1");
// 等待主分片可用的超时时间
updateRequest.timeout(TimeValue.timeValueSeconds(3));
//WAIT_UNTIL 一直保持请求连接中,直接当所做的更改对于搜索查询可见时的刷新发生后,再将结果返回
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
// 如果更新的过程中,文档被其它线程进行更新的话,会产生冲突,这个为设置更新失败后重试的次数
updateRequest.retryOnConflict(3);
// 是否将文档内容作为结果返回,默认是禁止的
updateRequest.fetchSource(true);
// 设置希望在返回结果中返回的字段值
String[] includes = new String[]{"name", "t*"};
String[] excludes = Strings.EMPTY_ARRAY;
updateRequest.fetchSource(new FetchSourceContext(false, includes, excludes));
// NO OPeration,空操作检查,默认情况为true,只有原来的source和新的source存在不同的字段情况下才会重建索引,
// 如果一模一样是不会触发重建索引的,如果将detect_noop=false不管内容有没有变化都会重建索引,这一点可以通过version的值的变化来发现
updateRequest.detectNoop(true);
// 设置在更新操作执行之前,要求活动状态的分片副本数;单机不要设置,否则会报错:超时
//updateRequest.waitForActiveShards(2);
//updateRequest.waitForActiveShards(ActiveShardCount.ALL);
// jsonMap和jsonString只是两种不同的传参方式,可以相互转换使用,效果相同
//更新后的数据
// Doc doc = new Doc();
// doc.setName("spring cloud实战111");
// updateRequest.doc(JSON.toJSONString(doc), XContentType.JSON);
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("name", "spring cloud实战3252");
jsonMap.put("description", "本课程主要从四个章节进行讲解3: 1.微服务架构入门 2.spring cloud 基础入门 3.实战Spring Boot 4.注册中心eureka。");
jsonMap.put("studymodel", "3101001");
jsonMap.put("timestamp", "2020-07-22 20:09:18");
jsonMap.put("price", 35.6);
updateRequest.doc(jsonMap);
// true,表明如果文档不存在,则新更新的文档内容作为新的内容插入文档,这个和scriptedUpsert的区别是:更新文档的两种不同方式,有的使用doc方法更新有的使用脚本更新
updateRequest.docAsUpsert(true);
// 为true,表明无论文档是否存在,脚本都会执行(如果不存在时,会创建一个新的文档)
updateRequest.scriptedUpsert(true);
// 如果文档不存在,使用upsert方法,会根据更新内容创建新的文档
// 需要更新的内容,以json字符串方式提供, 可以对象转
String jsonString = "{\"timestamp\":\"2020-07-11 20:09:18\"}";
updateRequest.upsert(jsonString, XContentType.JSON);
try {
UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("文档创建成功!");
}else if(updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 任何一个字段的更新,都算更新操作,即使只是日期字段的值变化
System.out.println("文档更新成功!");
}else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
System.out.println("文档删除成功!");
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
// 如果request.detectNoop(true);中设置为false,则这个永远不会进入
System.out.println("文档无变化!");
}
String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
System.out.println("index:" + index + "; type:" + type + "; id:" + id + ",version:" + version);
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
System.out.println("未完全执行所有分片,总分片数为:" + shardInfo.getTotal() + ",执行的分片数为:"+ shardInfo.getSuccessful());
}
// fetchSource 如果设置需要返回结果中包含内容了,如果没有设置返回内容,则result 等于null
GetResult result = updateResponse.getGetResult();
if(result == null) {
System.out.println("无内容结果返回");
}else if (result.isExists()) {
// 此例中如果文档不存在,且这样设置:request.scriptedUpsert(true);、request.docAsUpsert(false);,则会创建一个空内容的文档,因为脚本中没有内容,而禁止doc创建新文档
String sourceAsString = result.sourceAsString();
System.out.println(sourceAsString);
Map<String, Object> sourceAsMap = result.sourceAsMap();
}
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
// 如果不使用request.upsert方法,且request.scriptedUpsert(false);和request.docAsUpsert(false);都设置为false,则文档不存在时提示没有找到文档
System.out.println("文档不存在");
}else if(e.status() == RestStatus.CONFLICT) {
System.out.println("需要删除的文档版本与现在文档冲突!");
}
} catch (Exception e) {
e.printStackTrace();
}
版本控制
查询
// 查询请求对象
GetRequest getRequest = new GetRequest("book", "6");
//可以不设置,如果设置所要查询的文档版本号为2,如果当前版本号为4,则查询失败
// getRequest.version(1);
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, new String[]{"name", "*price", "timestamp"}, Strings.EMPTY_ARRAY);
getRequest.fetchSourceContext(fetchSourceContext);
try {
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
System.out.println(getResponse.getPrimaryTerm() + "---" + getResponse.getSeqNo());
操作时带上
IndexRequest indexRequest = new IndexRequest("elasticsearch_test");
indexRequest.timeout(TimeValue.timeValueSeconds(5));
indexRequest.opType(DocWriteRequest.OpType.INDEX);
indexRequest.setIfPrimaryTerm(1L);
indexRequest.setIfSeqNo(11L);
删除文档
创建请求对象
DeleteRequest request = new DeleteRequest(“posts”, “20”); //指定版本 DeleteRequest request = new DeleteRequest(“demo_index”, “1”).version(2);�
设置请求参数
// 等待主分片可用的超时时间 request.timeout(TimeValue.timeValueMinutes(10));
执行同步请求
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
查看返回结果 ```java // 判断删除的文档是否存在 deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND
// 查看分片执行情况信息 ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { System.out.println(“未完全执行所有分片,总分片数为:” + shardInfo.getTotal() + “,执行的分片数为:”+ shardInfo.getSuccessful()); }
if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); System.out.println(“失败原因:” + reason); return; } }
<a name="TDC2j"></a>
## 同步删除
```java
//创建请求对象
//索引名称,id
DeleteRequest request = new DeleteRequest("elasticsearch_test", "12");
// DeleteRequest request = new DeleteRequest("demo_index", "1").version(2);
// 等待主分片可用的超时时间
request.timeout(TimeValue.timeValueMinutes(10));
//WAIT_UNTIL 一直保持请求连接中,直接当所做的更改对于搜索查询可见时的刷新发生后,再将结果返回
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
try {
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
System.out.println(deleteResponse.status());
String index = deleteResponse.getIndex();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
System.out.println("index:" + index + "; id:" + id + ",version:" + version);
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
System.out.println("未完全执行所有分片,总分片数为:" + shardInfo.getTotal() + ",执行的分片数为:"+ shardInfo.getSuccessful());
// return;
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
System.out.println("失败原因:" + reason);
return;
}
}
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
System.out.println("未找到需要删除的文档!");
return;
}
} catch (ElasticsearchException e) {
if(e.status() == RestStatus.CONFLICT) {
System.out.println("需要删除的文档版本与现在文档冲突!");
}
} catch (Exception e) {
e.printStackTrace();
}
异步删除
注意
- 所有的异步操作,一定不要将连接client关闭太早,否则异步操作还没执行程序就被终止了
所以同步方法创作的连接对象可以放在try(client)条件中,执行完自动关闭。而异步方法的连接则不可以
- 注意:如果对一个文档添加版本条件(如id为2,版本为2)进行删除(删除后文档version变为3),再次删除时,
DeleteRequest request = new DeleteRequest(“posts”, “2”).version(2);
报错如下,而不是提示找不到文档:
{
"error":{
"root_cause":[
{
"type":"version_conflict_engine_exception",
"reason":"[doc][2]: version conflict, current version [3] is different than the one provided [2]",
"index_uuid":"60e-U9cXSYqFGC34_gTrug",
"shard":"2",
"index":"posts"
}
],
"type":"version_conflict_engine_exception",
"reason":"[doc][2]: version conflict, current version [3] is different than the one provided [2]",
"index_uuid":"60e-U9cXSYqFGC34_gTrug",
"shard":"2",
"index":"posts"
},
"status":409
}
//创建请求对象
//索引名称,id
DeleteRequest request = new DeleteRequest("elasticsearch_test", "12");
// DeleteRequest request = new DeleteRequest("elasticsearch_test", "1").version(2);
// 等待主分片可用的超时时间
request.timeout(TimeValue.timeValueMinutes(10));
//WAIT_UNTIL 一直保持请求连接中,直接当所做的更改对于搜索查询可见时的刷新发生后,再将结果返回
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
// 异步回调对象
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
System.out.println("未找到需要删除的文档!");
return;
}
String index = deleteResponse.getIndex();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
System.out.println("index:" + index + id + ",version:" + version);
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
System.out.println("未完全执行所有分片,总分片数为:" + shardInfo.getTotal() + ",执行的分片数为:"+ shardInfo.getSuccessful());
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
System.out.println("失败原因:" + reason);
return;
}
}
}
@Override
public void onFailure(Exception e) {
}
};
client.deleteAsync(request, RequestOptions.DEFAULT, listener);
TimeUnit.SECONDS.sleep(1);
批量操作数据
- BulkRequest对象可以用来在一次请求中,执行多个索引、更新或删除操作
且允许在一次请求中进行不同的操作,即一次请求中索引、更新、删除操作可以同时存在
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new DeleteRequest("posts").id("300"));
bulkRequest.add(new UpdateRequest("posts").id("2").doc(XContentType.JSON,"other", "test").fetchSource(true));
bulkRequest.add(new IndexRequest("posts").id("4").source(XContentType.JSON,"field", "baz"));
关于BulkRequest的参数设置,除了使用BulkRequest add(IndexRequest request)等方法加入针对单个不同的文档操作请求外,其它通用参数设置同单个文档操作设置
bulkRequest.timeout(TimeValue.timeValueMinutes(2)); bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
注意,针对单个文档操作的设置,应该在add方法里面设置,如为某个更新操作进行返回结果的设置[.fetchSource(true)]:
bulkRequest.add(new UpdateRequest("posts").id("2").doc(XContentType.JSON,"other", "test").fetchSource(true));
BulkResponse 作为执行结果的接收对象,它包含执行操作的信息,且可以使用它来遍历每个操作的执行结果
for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } }
注意的是:bulkItemResponse.getOpType() 返回的是请求问题的add方法加入的操作,而不是实际对文档进行操作的值,如添加到请求中的操作为
bulkRequest.add(new IndexRequest("posts").id("1").source(XContentType.JSON,"field", "foo"));
要是文档不存在,会自动创建一个,此时如下代码是执行的,也就是判断是创建成功是正确的
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; System.out.println("id=" + indexResponse.getId() + "的文档创建成功"); System.out.println("id=" + indexResponse.getId() + "文档操作类型:" + indexResponse.getResult()); }
但是要是文档存在,原来的文档会被更新(而非创建),如上代码依然执行,而如下判断
bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE
返回的却是false,所以要是想知道文档实际被进行的操作,可以通过如下代码进行:
DocWriteResponse itemResponse = bulkItemResponse.getResponse(); IndexResponse indexResponse = (IndexResponse) itemResponse; indexResponse.getResult()
其中itemResponse.getResult()和indexResponse.getResult()都可以获取实际的操作行为
如果elasticsearch服务器中不存在对应的值为1的文档id,会自动创建一个id为1的文档
同样,如果不存在posts文档库的话,也会根index/id据自动创建整个文档
bulkRequest.add(new IndexRequest("posts"),id("1").source(XContentType.JSON,"field", "foo"));
但是类似如下,如果posts文档库中如果已存在文档,则会报错
bulkRequest.add(new IndexRequest("posts").id("1").source(XContentType.JSON,"field", "foo"));
报错内容如下:
Rejecting mapping update to [posts] as the final mapping would have more than 1 type: [doc2, doc]
代码中可以通过如下判断是否出现这种非法的执行操作:
if (bulkItemResponse.getFailure() != null) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
if(failure.getStatus() == RestStatus.BAD_REQUEST) {
System.out.println("id=" + bulkItemResponse.getId() + "为非法的请求!");
continue;
}
}
对于IndexRequest请求操作,如果希望创建文档,而文档要是存在时不要进行更新的话,可以进行如下设置:
bulkRequest.add(new IndexRequest("posts").id("1").source(XContentType.JSON,"field", "foo").opType(DocWriteRequest.OpType.CREATE));
即添加.opType(DocWriteRequest.OpType.CREATE)设置,同时failure.getStatus() == RestStatus.CONFLICT设置不抛出异常
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
if(bulkItemResponse.getFailure() != null && bulkItemResponse.getFailure().getStatus() == RestStatus.CONFLICT) {
System.out.println("id=" + bulkItemResponse.getId() + "与现在文档冲突");
continue;
}
IndexResponse indexResponse = (IndexResponse) itemResponse;
System.out.println("id=" + indexResponse.getId() + "的文档创建成功");
System.out.println("id=" + indexResponse.getId() + "文档操作类型:" + itemResponse.getResult());
}
对于删除操作,如果不作特别的判断,如下的话,会一直都是会进入if方法执行的(即使文档不存在)
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; }
所以如果要想判断文档不存在的情况,则需要如下判断:
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { System.out.println("id=" + deleteResponse.getId() + "的文档未找到,未执行删除!"); }else { System.out.println("id=" + deleteResponse.getId() + "的文档删除成功"); } }
操作要指定id, 不然会报
org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: id is missing;
注意:bulk批量操作里是不允许执行get操作的,因为get操作和其它操作的参数是不同的,所以如下代码会报错:
bulkRequest.add(new GetRequest("posts").id("22"));
完整代码 ```java BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout(“10s”); //插入数据 List
docList = new ArrayList<>(); // public Doc(String name, String description, String studymodel, Date timestamp, Double price) { docList.add(new Doc(“n-1”, “desc”, “asdas”, new Date(), 12.1)); docList.add(new Doc(“n-2”, “desc”, “fsdf”, new Date(), 12.1));
//批量处理请求 for (int i = 0; i < docList.size(); i++) { bulkRequest.add( new IndexRequest(“elasticsearch_test”) .id(“” + (i + 20)) //id自增 .source(JSON.toJSONString(docList.get(i)), XContentType.JSON) ); }
bulkRequest.add(new IndexRequest(“elasticsearch_test”).id(“57”).source(XContentType.JSON, “name”, “n-n2”, “desc”, “desc-n2”).opType(DocWriteRequest.OpType.CREATE)); bulkRequest.add(new IndexRequest(“elasticsearch_test”).id(“58”).source(XContentType.JSON, “name”, “n-n3”, “desc”, “desc-n3”));
bulkRequest.add(new DeleteRequest(“elasticsearch_test”).id(“50”)); bulkRequest.add(new UpdateRequest(“elasticsearch_test”, “20”).doc(XContentType.JSON, “desc”, “test”).fetchSource(true)); bulkRequest.add(new IndexRequest(“elasticsearch_test”).id(“21”).source(XContentType.JSON, “name”, “dasda”));
//设置主分片的超时时间 bulkRequest.timeout(TimeValue.timeValueMinutes(2)); bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
try { BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT); // 有操作执行失败 System.out.println(bulkResponse.hasFailures());
for (BulkItemResponse bulkItemResponse : bulkResponse) {
//有执行失败的
if (bulkItemResponse.getFailure() != null) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
System.out.println(failure.getCause());
if (failure.getStatus() == RestStatus.BAD_REQUEST) {
System.out.println("id=" + bulkItemResponse.getId() + "为非法的请求!");
continue;
}
}
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
if (bulkItemResponse.getFailure() != null && bulkItemResponse.getFailure().getStatus() == RestStatus.CONFLICT) {
System.out.println("id=" + bulkItemResponse.getId() + "与现在文档冲突");
continue;
}
IndexResponse indexResponse = (IndexResponse) itemResponse;
System.out.println("id=" + indexResponse.getId() + "的文档创建成功");
System.out.println("id=" + indexResponse.getId() + "文档操作类型:" + itemResponse.getResult());
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
System.out.println("id=" + updateResponse.getId() + "的文档更新成功");
System.out.println("id=" + updateResponse.getId() + "文档内容为:" + updateResponse.getGetResult().sourceAsString());
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
System.out.println("id=" + deleteResponse.getId() + "的文档未找到,未执行删除!");
} else {
System.out.println("id=" + deleteResponse.getId() + "的文档删除成功");
}
}
}
} catch (Exception e) { e.printStackTrace(); }
<a name="CJPgs"></a>
# multiGet
BulkRequest是用来进行批量索引、更新、删除操作的请求对象。<br />批量查询的操作: Mult-Get Request
1. 使用主请求对象的add方法,将子查询对象加入到主查询中
```java
request.add(new MultiGetRequest.Item("index", "another_id"));
- 可以分别针对每一个子查询进行如下设置:
查询的文档内容不返回:.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)
request.add(new MultiGetRequest.Item("posts", "doc", "2").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
指定查询哪些字段内容或过滤掉哪些字段
String[] includes = new String[] {"user", "*r"}; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.add(new MultiGetRequest.Item("posts2", "3").fetchSourceContext(fetchSourceContext));
分别指定查询的路由分片和版本等
request.add(new MultiGetRequest.Item("posts", "doc", "with_routing") .routing("some_routing")); request.add(new MultiGetRequest.Item("index", "type", "with_version") .versionType(VersionType.EXTERNAL) .version(10123L));
注:以上设置无法在主请求中设置
- 对主请求设置
preference, realtime and refresh 需要在主请求里设置,子请求中无法设置这些值
request.preference("some_preference");
// realtime的值默认为true
request.realtime(false);
request.refresh(true);
执行请求并获取结果
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
对结果的处理及说明
可以指定处理某条查询
MultiGetItemResponse firstItem = response.getResponses()[0];
遍历查询的结果
for(MultiGetItemResponse item: response.getResponses()) { String index = item.getIndex(); String id = item.getId(); System.out.println("第" + ++count + "条-》index:" + index + "; id:" + id); if(item.getFailure() != null) { Exception e = item.getFailure().getFailure(); ElasticsearchException ee = (ElasticsearchException) e; if(ee.getMessage().contains("reason=no such index")) { System.out.println("查询的文档库不存在!"); } } GetResponse getResponse = item.getResponse(); if (getResponse.isExists()) { long version = getResponse.getVersion(); String sourceAsString = getResponse.getSourceAsString(); System.out.println("查询的结果为:"); System.out.println(sourceAsString); Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); byte[] sourceAsBytes = getResponse.getSourceAsBytes(); } else { System.out.println("没有查询到相应文档"); } }
特殊情况说明:
1)查询的index索引库不存在时,则返回结果的failure参数不为null,含报错信息,
GetResponse getResponse = item.getResponse() 得到的getResponse值为null
故为了后面数据抛出空指针异常,要做兼容处理:if(item.getFailure() != null) { Exception e = item.getFailure().getFailure(); ElasticsearchException ee = (ElasticsearchException) e; if(ee.getMessage().contains("reason=no such index")) { System.out.println("查询的文档库不存在!"); } }
2)如果文档类型type或id不存在,getResponse.isExists() 的结果是false
完整代码
// 创建查询父对象
MultiGetRequest request = new MultiGetRequest();
// 添加子查询
request.add(new MultiGetRequest.Item("posts", "1"));
request.add(new MultiGetRequest.Item("posts", "2").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
String[] includes = new String[]{"name", "desc*"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("posts2", "3").fetchSourceContext(fetchSourceContext));
// 针对每个子请求分别设置,无法在主请求中设置
// 指定去哪个分片上查询,如何指定分片上没有,不会再去其它分片查询,如果不指定,则依次轮询各个分片查询
request.add(new MultiGetRequest.Item("posts", "2")
.routing("some_routing"));
request.add(new MultiGetRequest.Item("posts", "1")
.versionType(VersionType.EXTERNAL)
.version(1L));
// preference, realtime and refresh 需要在主请求里设置,子请求中无法设置
//偏好查询
request.preference("some_preference");
// realtime的值默认为true.是否实时执行
request.realtime(false);
request.refresh(true);
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
int count = 0;
for (MultiGetItemResponse item : response.getResponses()) {
String index = item.getIndex();
String id = item.getId();
System.out.println("第" + ++count + "条-》index:" + index + "; id:" + id);
if (item.getFailure() != null) {
Exception e = item.getFailure().getFailure();
ElasticsearchException ee = (ElasticsearchException) e;
if (ee.getMessage().contains("reason=no such index")) {
System.out.println("查询的文档库不存在!");
}
}
GetResponse getResponse = item.getResponse();
if (getResponse != null && getResponse.isExists()) {
long version = getResponse.getVersion();
String sourceAsString = getResponse.getSourceAsString();
System.out.println("查询的结果为:");
System.out.println(sourceAsString);
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
} else {
System.out.println("没有查询到相应文档");
}
}
嵌套文档
PUT my-nested-000001
{
"mappings": {
"properties": {
"parent": {
"type": "nested" ,
"properties": {
"children":{
"type": "nested"
}
}
}
}
}
}
对应的Java代码
这里需要注意一点,一定要将对应的字段类型定义为nested
CreateIndexRequest request = new CreateIndexRequest(indexName);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.startObject("properties");
{
builder.startObject("parent");
{
builder.field("type", "nested");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
request.mapping(builder);
client.indices().create(request, RequestOptions.DEFAULT);
插入文档
Person zhang = new Person();
zhang.setFirstName("zhang");
zhang.setSecondName("xiao");
Person fa= new Person();
fa.setFirstName("zhang");
fa.setSecondName("fa");
Person ma= new Person();
ma.setFirstName("zheng");
ma.setSecondName("maa");
List<Person> parents = new ArrayList<>();
parents.add(fa);
parents.add(ma);
zhang.setParent(parents);
IndexRequest indexRequest = new IndexRequest(indexName).source(JSON.toJSONString(zhang), XContentType.JSON)
//if do not set IMMEDIATE, last step will return nothing
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
IndexResponse out= client.index(indexRequest, RequestOptions.DEFAULT);
查询文档
如果需要根据嵌套的字段查询,也不能简单的通过query进行,需要通过特定的nested方法,
GET my-index-000001/_search
{
"query": {
"nested": {
"path": "parent",
"query": {
"bool": {
"must": [
{ "match": { "user.first": "Alice" }},
{ "match": { "user.last": "Smith" }}
]
}
}
}
}
}
该方法需要一个参数path,对应为嵌套的文档字段,里面可以使用各种常规查询
Java实现如下
SearchRequest search= new SearchRequest(indexName);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
NestedQueryBuilder nested=QueryBuilders.nestedQuery("parent",QueryBuilders.matchQuery("parent.firstName", "zhang"),ScoreMode.None);
searchSourceBuilder.query(nested);
search.source(searchSourceBuilder);
SearchResponse response=client.search(search, RequestOptions.DEFAULT);
System.out.println(response.toString());
response.getHits().forEach(hi ->{
System.out.println(hi.getSourceAsString());
});