简介

官方文档
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html

csdn翻译的
https://blog.csdn.net/hbtj_1216/article/details/84880057

刷新策略

RefreshPolicy

可知有以下三种刷新策略:

  • RefreshPolicy#IMMEDIATE:

请求向ElasticSearch提交了数据,立即进行数据刷新,然后再结束请求。
优点:实时性高、操作延时短。
缺点:资源消耗高。

  • RefreshPolicy#WAIT_UNTIL:

请求向ElasticSearch提交了数据,等待数据完成刷新,然后再结束请求。
优点:实时性高、操作延时长。
缺点:资源消耗低。

  • RefreshPolicy#NONE:

默认策略。
请求向ElasticSearch提交了数据,不关系数据是否已经完成刷新,直接结束请求。
优点:操作延时短、资源消耗低。
缺点:实时性低。

实现此接口的主要类如下:

  • DeleteRequestBuilder
  • IndexRequestBuilder
  • UpdateRequestBuilder
  • BulkRequestBuilder

    1. DeleteRequest request = new DeleteRequest("demo_index", "1");
    2. request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);

    添加文档

    1. //添加文档
    2. /*
    3. POST /elasticsearch_test/_doc/1
    4. {
    5. "name": "spring cloud实战",
    6. "description": "本课程主要从四个章节进行讲解: 1.微服务架构入门 2.spring cloud 基础入门 3.实战Spring Boot 4.注册中心eureka。",
    7. "studymodel":"201001",
    8. "timestamp": "2020-08-22 20:09:18",
    9. "price": 5.6
    10. }
    11. */

    同步请求

    进行同步请求:使用client.index(indexRequest, RequestOptions.DEFAULT);方法 ```java @Test public void testAddDoc() throws IOException { IndexRequest indexRequest = new IndexRequest(“elasticsearch_test”);

    /** 如果操作设置为DocWriteRequest.OpType.INDEX(默认值),如果文档存在,则更新文档;如果文档不存在,则创建文档 如果操作设置为DocWriteRequest.OpType.CREATE,则是指定为创建文档操作,如果对象的文档(根据id判断)存在,则报错如下:

    1. ElasticsearchStatusException[Elasticsearch exception
    2. [type=version_conflict_engine_exception,
    3. reason=[doc][16]: version conflict, document already exists

    索引操作只能为以上两种操作值,不能为UPDATE和DELETE */ indexRequest.timeout(TimeValue.timeValueSeconds(5)); indexRequest.opType(DocWriteRequest.OpType.INDEX);

    //indexRequest.id(“2”); // 文档内容 准备json数据 Map jsonMap = new HashMap<>(); jsonMap.put(“name”, “spring cloud实战3”); jsonMap.put(“description”, “本课程主要从四个章节进行讲解3: 1.微服务架构入门 2.spring cloud 基础入门 3.实战Spring Boot 4.注册中心eureka。”); jsonMap.put(“studymodel”, “3101001”); jsonMap.put(“timestamp”, “2020-07-22 20:09:18”); jsonMap.put(“price”, 35.6); indexRequest.source(jsonMap);

    // 执行请求 try {

    1. IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
    2. DocWriteResponse.Result result = indexResponse.getResult();
    3. System.out.println(result);
    4. String index = indexResponse.getIndex();
    5. String id = indexResponse.getId();
    6. long version = indexResponse.getVersion();
    7. if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    8. System.out.println("添加成功");
    9. System.out.println("id:" + id);
    10. System.out.println("version:" + version);
    11. System.out.println("index:" + index);
    12. } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    13. System.out.println("更新成功");
    14. System.out.println("index:" + index);
    15. System.out.println("id:" + id);
    16. System.out.println("version:" + version);
    17. }

    }catch (ElasticsearchException e) {

    1. if (e.status() == RestStatus.CONFLICT) {
    2. System.out.println("创建的文档与已存在的发生冲突");
    3. }

    }

    client.close();

}

  1. <a name="M6iyl"></a>
  2. ## 异步请求
  3. 创建异步请求的,回调对象:ActionListener<IndexResponse><br />如果执行成功,会自动调用onResponse方法,如果执行失败,会回调onFailure方法<br />可以从传入的IndexResponse和Exception类型参数中获取相关创建情况信息
  4. ```java
  5. @Test
  6. public void myTest() throws Exception {
  7. IndexRequest indexRequest = new IndexRequest("elasticsearch_test");
  8. indexRequest.timeout(TimeValue.timeValueSeconds(5));
  9. indexRequest.opType(DocWriteRequest.OpType.INDEX);
  10. //indexRequest.id("2");
  11. // 文档内容 XContentBuilder,elasticsearch内容助手会根据该对象自动生成json格式内容进行保存
  12. XContentBuilder builder = XContentFactory.jsonBuilder();
  13. builder.startObject();
  14. {
  15. builder.field("name", "spring cloud实战1");
  16. builder.timeField("timestamp", new Date());
  17. builder.field("description", "本课程主要从四个章节进行讲解3: 1.微服务架构入门 2.spring cloud 基础入门 3.实战Spring Boot 4.注册中心eureka。");
  18. builder.field("studymodel", "3101001");
  19. builder.field("price", 35.6);
  20. }
  21. builder.endObject();
  22. indexRequest.source(builder);
  23. // 执行请求
  24. ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
  25. @Override
  26. public void onResponse(IndexResponse indexResponse) {
  27. String index = indexResponse.getIndex();
  28. String id = indexResponse.getId();
  29. long version = indexResponse.getVersion();
  30. DocWriteResponse.Result result = indexResponse.getResult();
  31. System.out.println(result);
  32. if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
  33. System.out.println("添加成功");
  34. System.out.println("index:" + index);
  35. System.out.println("id:" + id);
  36. System.out.println("version:" + version);
  37. } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
  38. System.out.println("更新成功");
  39. System.out.println("index:" + index);
  40. System.out.println("id:" + id);
  41. System.out.println("version:" + version);
  42. }
  43. ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
  44. if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
  45. }
  46. if (shardInfo.getFailed() > 0) {
  47. for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
  48. String reason = failure.reason();
  49. }
  50. }
  51. }
  52. @Override
  53. public void onFailure(Exception e) {
  54. e.printStackTrace();
  55. }
  56. };
  57. //进行异步请求:将请求对象、和回调对象作为参数传入
  58. client.indexAsync(indexRequest, RequestOptions.DEFAULT, listener);
  59. TimeUnit.SECONDS.sleep(10);
  60. //不被注释掉,可能还没有将请求发送出去,连接就会被关闭,从而创建或更新失败
  61. //client.close();
  62. }

查询文档

为请求对象设置参数:

  1. 设置查询的版本

getRequest.version(2) 可以不设置,如果设置所要查询的文档版本号为2,如果当前版本号为4,则查询失败,报错如下

  1. 设置查询的参数
    查询文档中指定的字段(如message和以Date结尾的字段): String[] includes = new String[]{“message”, “*Date”}

    查询文档中所有的字段:String[] includes = Strings.EMPTY_ARRAY;

  2. 过滤掉文档中指定的字段(如message): String[] excludes = new String[]{“message”};

    不过滤文档中任何字段:String[] excludes = Strings.EMPTY_ARRAY;

  3. FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);

其中includes可以传null,此时表示查询所有的字段
excludes也可以为null,此时表示不过滤掉任何字段
注: 如果includes和excludes包含同一个字段,则excludes起作用,也就是查询的字段会被过滤掉

  1. 将设置的查询参数赋值给请求对象

    getRequest.fetchSourceContext(fetchSourceContext);

  2. 如果不希望返回结果中包含内容,可以进行如下设置

    getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);

默认是返回全部内容的

@Test
public void testGetDoc() throws IOException {
    // 查询请求对象
    GetRequest getRequest = new GetRequest("elasticsearch_test", "12");
    //可以不设置,如果设置所要查询的文档版本号为2,如果当前版本号为4,则查询失败
    //        getRequest.version(1);
    FetchSourceContext fetchSourceContext = new FetchSourceContext(true, new String[]{"name", "*price", "timestamp"}, Strings.EMPTY_ARRAY);
    getRequest.fetchSourceContext(fetchSourceContext);

    try {
        GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
        System.out.println("Index: " + getResponse.getIndex() + "; Id: " + getResponse.getId());
        if (getResponse.isExists()) {

            System.out.println("version: " + getResponse.getVersion() + "; 内容字符串: " + getResponse.getSourceAsString());//打印文档的内容
            System.out.println(getResponse);//返回的全部内容和命令式一样的
            // 得到文档内容
            Map<String, Object> sourceMap = getResponse.getSourceAsMap();
            //                byte[] sourceAsBytes = getResponse.getSourceAsBytes();
            System.out.println(sourceMap);
        } else {
            System.out.println("查询的文档不存在!");
        }

    } catch (ElasticsearchException e) {
        if (e.status() == RestStatus.NOT_FOUND) {
            System.out.println("所查询的文档库不存在!");
        }else if(e.status() == RestStatus.CONFLICT) {
            System.out.println("指定version与当前版本冲突");
        }
    }
}

全部记录

/*
       GET   /elasticsearch_test/_search
        {
          "query":{
             "match_all":{}
          }
        }
    */
@Test
public void testSearchAll() throws IOException {
    // 搜索请求对象
    SearchRequest searchRequest = new SearchRequest("elasticsearch_test");
    searchRequest.searchType(SearchType.QUERY_THEN_FETCH);
    // 搜索源构建对象
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    // 设置搜索方法
    searchSourceBuilder.query(QueryBuilders.matchAllQuery());
    // 设置查询出那些字段, 不查出那些字段
    searchSourceBuilder.fetchSource(new String[]{"name", "price", "timestamp"}, new String[]{});
    // 请求对象设置 搜索源对象
    searchRequest.source(searchSourceBuilder);
    // 使用client  执行搜索
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    // 搜索结果
    SearchHits hits = searchResponse.getHits();
    // 匹配到的总记录数
    TotalHits totalHits = hits.getTotalHits();
    System.out.println("查询到的总记录数:" + totalHits.value);
    // 得到的匹配度高的文档
    SearchHit[] searchHits = hits.getHits();
    for (SearchHit hit : searchHits) {
        String id = hit.getId();
        // 源文档的内容
        Map<String, Object> sourceMap = hit.getSourceAsMap();
        String name = (String) sourceMap.get("name");
        String timestamp = (String) sourceMap.get("timestamp");
        String description = (String) sourceMap.get("description");
        Double price = (Double) sourceMap.get("price");
        System.out.println(name);
        System.out.println(timestamp);
        System.out.println(description);
        System.out.println(price);
    }
}

更新文档

以下情况


  • 如果将request.docAsUpsert(true) 和request.scriptedUpsert(true)注释掉或都设置为false,
  • 如果将request.docAsUpsert(true) 设置为faluse,而request.scriptedUpsert(true)为true

且文档不存在,则创建出来的文档内容为:
{“created”:”2017-01-01”},

即只有request.upsert(jsonString, XContentType.JSON)中的jsonString内容被创建,而request.doc(jsonMap)中的jsonMap内容没被创建

  1. 此例中如果文档不存在,且这样设置:request.scriptedUpsert(true);、request.docAsUpsert(false);,则会创建一个空内容的文档,因为脚本中没有内容,而禁止通过doc创建新文档

  2. 如果不使用request.upsert方法,且request.scriptedUpsert(false);和request.docAsUpsert(false);都设置为false,则文档不存在时提示没有找到文档,而不会创建新的文档

  3. 如果request.docAsUpsert(true)和request.scriptedUpsert(true)都设置为true,且

    request.doc(jsonMap)被注释掉时,会报错如下: org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: script or doc is missing;2: doc must be specified if doc_as_upsert is enabled;

即如果开启动了doc_as_upsert方法,则必须使用doc方法传入需要更新的内容

  1. 注:单机不要使用如下方法,否则会报超时异常

    // request.waitForActiveShards(2); // request.waitForActiveShards(ActiveShardCount.ALL);

//创建请求对象
//索引名称,id
UpdateRequest updateRequest = new UpdateRequest("elasticsearch_test", "1");
// 等待主分片可用的超时时间
updateRequest.timeout(TimeValue.timeValueSeconds(3));
//WAIT_UNTIL 一直保持请求连接中,直接当所做的更改对于搜索查询可见时的刷新发生后,再将结果返回
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
// 如果更新的过程中,文档被其它线程进行更新的话,会产生冲突,这个为设置更新失败后重试的次数
updateRequest.retryOnConflict(3);
// 是否将文档内容作为结果返回,默认是禁止的
updateRequest.fetchSource(true);

// 设置希望在返回结果中返回的字段值
String[] includes = new String[]{"name", "t*"};
String[] excludes = Strings.EMPTY_ARRAY;
updateRequest.fetchSource(new FetchSourceContext(false, includes, excludes));
// NO OPeration,空操作检查,默认情况为true,只有原来的source和新的source存在不同的字段情况下才会重建索引,
// 如果一模一样是不会触发重建索引的,如果将detect_noop=false不管内容有没有变化都会重建索引,这一点可以通过version的值的变化来发现
updateRequest.detectNoop(true);

// 设置在更新操作执行之前,要求活动状态的分片副本数;单机不要设置,否则会报错:超时
//updateRequest.waitForActiveShards(2);
//updateRequest.waitForActiveShards(ActiveShardCount.ALL);


// jsonMap和jsonString只是两种不同的传参方式,可以相互转换使用,效果相同
//更新后的数据
//        Doc doc = new Doc();
//        doc.setName("spring cloud实战111");
//        updateRequest.doc(JSON.toJSONString(doc), XContentType.JSON);

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("name", "spring cloud实战3252");
jsonMap.put("description", "本课程主要从四个章节进行讲解3: 1.微服务架构入门 2.spring cloud 基础入门 3.实战Spring Boot 4.注册中心eureka。");
jsonMap.put("studymodel", "3101001");
jsonMap.put("timestamp", "2020-07-22 20:09:18");
jsonMap.put("price", 35.6);
updateRequest.doc(jsonMap);

// true,表明如果文档不存在,则新更新的文档内容作为新的内容插入文档,这个和scriptedUpsert的区别是:更新文档的两种不同方式,有的使用doc方法更新有的使用脚本更新
updateRequest.docAsUpsert(true);
// 为true,表明无论文档是否存在,脚本都会执行(如果不存在时,会创建一个新的文档)
updateRequest.scriptedUpsert(true);
// 如果文档不存在,使用upsert方法,会根据更新内容创建新的文档
// 需要更新的内容,以json字符串方式提供, 可以对象转
String jsonString = "{\"timestamp\":\"2020-07-11 20:09:18\"}";
updateRequest.upsert(jsonString, XContentType.JSON);

try {
    UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
    if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
        System.out.println("文档创建成功!");
    }else if(updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
        // 任何一个字段的更新,都算更新操作,即使只是日期字段的值变化
        System.out.println("文档更新成功!");
    }else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
        System.out.println("文档删除成功!");
    } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
        // 如果request.detectNoop(true);中设置为false,则这个永远不会进入
        System.out.println("文档无变化!");
    }
    String index = updateResponse.getIndex();
    String type = updateResponse.getType();
    String id = updateResponse.getId();
    long version = updateResponse.getVersion();
    System.out.println("index:" + index + "; type:" + type + "; id:" + id + ",version:" + version);
    ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
    if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
        System.out.println("未完全执行所有分片,总分片数为:" + shardInfo.getTotal() + ",执行的分片数为:"+ shardInfo.getSuccessful());
    }
    // fetchSource 如果设置需要返回结果中包含内容了,如果没有设置返回内容,则result 等于null
    GetResult result = updateResponse.getGetResult();
    if(result == null) {
        System.out.println("无内容结果返回");
    }else if (result.isExists()) {
        // 此例中如果文档不存在,且这样设置:request.scriptedUpsert(true);、request.docAsUpsert(false);,则会创建一个空内容的文档,因为脚本中没有内容,而禁止doc创建新文档
        String sourceAsString = result.sourceAsString();
        System.out.println(sourceAsString);
        Map<String, Object> sourceAsMap = result.sourceAsMap();
    }
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.NOT_FOUND) {
        // 如果不使用request.upsert方法,且request.scriptedUpsert(false);和request.docAsUpsert(false);都设置为false,则文档不存在时提示没有找到文档
        System.out.println("文档不存在");
    }else if(e.status() == RestStatus.CONFLICT) {
        System.out.println("需要删除的文档版本与现在文档冲突!");
    }
} catch (Exception e) {
    e.printStackTrace();
}

版本控制

查询

// 查询请求对象
GetRequest getRequest = new GetRequest("book", "6");
//可以不设置,如果设置所要查询的文档版本号为2,如果当前版本号为4,则查询失败
//        getRequest.version(1);
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, new String[]{"name", "*price", "timestamp"}, Strings.EMPTY_ARRAY);
getRequest.fetchSourceContext(fetchSourceContext);

try {
    GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
    System.out.println(getResponse.getPrimaryTerm() + "---" + getResponse.getSeqNo());

操作时带上

IndexRequest indexRequest = new IndexRequest("elasticsearch_test");
indexRequest.timeout(TimeValue.timeValueSeconds(5));
indexRequest.opType(DocWriteRequest.OpType.INDEX);

indexRequest.setIfPrimaryTerm(1L);
indexRequest.setIfSeqNo(11L);

删除文档

  1. 创建请求对象

    DeleteRequest request = new DeleteRequest(“posts”, “20”); //指定版本 DeleteRequest request = new DeleteRequest(“demo_index”, “1”).version(2);

  2. 设置请求参数

    // 等待主分片可用的超时时间 request.timeout(TimeValue.timeValueMinutes(10));

  3. 执行同步请求

    DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);

  4. 查看返回结果 ```java // 判断删除的文档是否存在 deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND

// 查看分片执行情况信息 ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { System.out.println(“未完全执行所有分片,总分片数为:” + shardInfo.getTotal() + “,执行的分片数为:”+ shardInfo.getSuccessful()); }

if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); System.out.println(“失败原因:” + reason); return; } }

<a name="TDC2j"></a>
## 同步删除
```java
//创建请求对象
//索引名称,id
DeleteRequest request = new DeleteRequest("elasticsearch_test", "12");
//        DeleteRequest request = new DeleteRequest("demo_index", "1").version(2);
// 等待主分片可用的超时时间
request.timeout(TimeValue.timeValueMinutes(10));
//WAIT_UNTIL 一直保持请求连接中,直接当所做的更改对于搜索查询可见时的刷新发生后,再将结果返回
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);

try {
    DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
    System.out.println(deleteResponse.status());

    String index = deleteResponse.getIndex();
    String id = deleteResponse.getId();
    long version = deleteResponse.getVersion();
    System.out.println("index:" + index +  "; id:" + id + ",version:" + version);
    ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
    if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
        System.out.println("未完全执行所有分片,总分片数为:" + shardInfo.getTotal() + ",执行的分片数为:"+ shardInfo.getSuccessful());
        // return;
    }
    if (shardInfo.getFailed() > 0) {
        for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
            String reason = failure.reason();
            System.out.println("失败原因:" + reason);
            return;
        }
    }

    if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
        System.out.println("未找到需要删除的文档!");
        return;
    }
} catch (ElasticsearchException e) {
    if(e.status() == RestStatus.CONFLICT) {
        System.out.println("需要删除的文档版本与现在文档冲突!");
    }
} catch (Exception e) {
    e.printStackTrace();
}

异步删除

注意

  1. 所有的异步操作,一定不要将连接client关闭太早,否则异步操作还没执行程序就被终止了

所以同步方法创作的连接对象可以放在try(client)条件中,执行完自动关闭。而异步方法的连接则不可以

  1. 注意:如果对一个文档添加版本条件(如id为2,版本为2)进行删除(删除后文档version变为3),再次删除时,

    DeleteRequest request = new DeleteRequest(“posts”, “2”).version(2);

报错如下,而不是提示找不到文档:

{
    "error":{
        "root_cause":[
            {
                "type":"version_conflict_engine_exception",
                "reason":"[doc][2]: version conflict, current version [3] is different than the one provided [2]",
                "index_uuid":"60e-U9cXSYqFGC34_gTrug",
                "shard":"2",
                "index":"posts"
            }
        ],
        "type":"version_conflict_engine_exception",
        "reason":"[doc][2]: version conflict, current version [3] is different than the one provided [2]",
        "index_uuid":"60e-U9cXSYqFGC34_gTrug",
        "shard":"2",
        "index":"posts"
    },
    "status":409
}
//创建请求对象
//索引名称,id
DeleteRequest request = new DeleteRequest("elasticsearch_test", "12");
//        DeleteRequest request = new DeleteRequest("elasticsearch_test", "1").version(2);
// 等待主分片可用的超时时间
request.timeout(TimeValue.timeValueMinutes(10));
//WAIT_UNTIL 一直保持请求连接中,直接当所做的更改对于搜索查询可见时的刷新发生后,再将结果返回
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);

// 异步回调对象
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {

    @Override
    public void onResponse(DeleteResponse deleteResponse) {
        if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
            System.out.println("未找到需要删除的文档!");
            return;
        }
        String index = deleteResponse.getIndex();
        String id = deleteResponse.getId();
        long version = deleteResponse.getVersion();
        System.out.println("index:" + index + id + ",version:" + version);
        ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
            System.out.println("未完全执行所有分片,总分片数为:" + shardInfo.getTotal() + ",执行的分片数为:"+ shardInfo.getSuccessful());
        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                String reason = failure.reason();
                System.out.println("失败原因:" + reason);
                return;
            }
        }
    }

    @Override
    public void onFailure(Exception e) {

    }
};
client.deleteAsync(request, RequestOptions.DEFAULT, listener);

TimeUnit.SECONDS.sleep(1);

批量操作数据

  1. BulkRequest对象可以用来在一次请求中,执行多个索引、更新或删除操作

且允许在一次请求中进行不同的操作,即一次请求中索引、更新、删除操作可以同时存在

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new DeleteRequest("posts").id("300"));
bulkRequest.add(new UpdateRequest("posts").id("2").doc(XContentType.JSON,"other", "test").fetchSource(true));
bulkRequest.add(new IndexRequest("posts").id("4").source(XContentType.JSON,"field", "baz"));
  1. 关于BulkRequest的参数设置,除了使用BulkRequest add(IndexRequest request)等方法加入针对单个不同的文档操作请求外,其它通用参数设置同单个文档操作设置

    bulkRequest.timeout(TimeValue.timeValueMinutes(2));
    bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
    
  2. 注意,针对单个文档操作的设置,应该在add方法里面设置,如为某个更新操作进行返回结果的设置[.fetchSource(true)]:

    bulkRequest.add(new UpdateRequest("posts").id("2").doc(XContentType.JSON,"other", "test").fetchSource(true));
    
  3. BulkResponse 作为执行结果的接收对象,它包含执行操作的信息,且可以使用它来遍历每个操作的执行结果

    for (BulkItemResponse bulkItemResponse : bulkResponse) { 
     DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 
    
     if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
             || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
         IndexResponse indexResponse = (IndexResponse) itemResponse;
    
     } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { 
         UpdateResponse updateResponse = (UpdateResponse) itemResponse;
    
     } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
         DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
     }
    }
    

    注意的是:bulkItemResponse.getOpType() 返回的是请求问题的add方法加入的操作,而不是实际对文档进行操作的值,如添加到请求中的操作为

    bulkRequest.add(new IndexRequest("posts").id("1").source(XContentType.JSON,"field", "foo"));
    

    要是文档不存在,会自动创建一个,此时如下代码是执行的,也就是判断是创建成功是正确的

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
      IndexResponse indexResponse = (IndexResponse) itemResponse;
      System.out.println("id=" + indexResponse.getId() + "的文档创建成功");
      System.out.println("id=" + indexResponse.getId() + "文档操作类型:" + indexResponse.getResult());
    }
    

    但是要是文档存在,原来的文档会被更新(而非创建),如上代码依然执行,而如下判断

    bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE
    

    返回的却是false,所以要是想知道文档实际被进行的操作,可以通过如下代码进行:

    DocWriteResponse itemResponse = bulkItemResponse.getResponse();
    IndexResponse indexResponse = (IndexResponse) itemResponse;
    indexResponse.getResult()
    

    其中itemResponse.getResult()和indexResponse.getResult()都可以获取实际的操作行为

  4. 如果elasticsearch服务器中不存在对应的值为1的文档id,会自动创建一个id为1的文档

同样,如果不存在posts文档库的话,也会根index/id据自动创建整个文档

bulkRequest.add(new IndexRequest("posts"),id("1").source(XContentType.JSON,"field", "foo"));

但是类似如下,如果posts文档库中如果已存在文档,则会报错

bulkRequest.add(new IndexRequest("posts").id("1").source(XContentType.JSON,"field", "foo"));

报错内容如下:

Rejecting mapping update to [posts] as the final mapping would have more than 1 type: [doc2, doc]

代码中可以通过如下判断是否出现这种非法的执行操作:

if (bulkItemResponse.getFailure() != null) {
    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
    if(failure.getStatus() == RestStatus.BAD_REQUEST) {
        System.out.println("id=" + bulkItemResponse.getId() + "为非法的请求!");
        continue;
    }
}

对于IndexRequest请求操作,如果希望创建文档,而文档要是存在时不要进行更新的话,可以进行如下设置:

bulkRequest.add(new IndexRequest("posts").id("1").source(XContentType.JSON,"field", "foo").opType(DocWriteRequest.OpType.CREATE));

即添加.opType(DocWriteRequest.OpType.CREATE)设置,同时failure.getStatus() == RestStatus.CONFLICT设置不抛出异常

if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
    if(bulkItemResponse.getFailure() != null && bulkItemResponse.getFailure().getStatus() == RestStatus.CONFLICT) {
       System.out.println("id=" + bulkItemResponse.getId() + "与现在文档冲突");
       continue;
    }
    IndexResponse indexResponse = (IndexResponse) itemResponse;
    System.out.println("id=" + indexResponse.getId() + "的文档创建成功");
    System.out.println("id=" + indexResponse.getId() + "文档操作类型:" + itemResponse.getResult());
}
  1. 对于删除操作,如果不作特别的判断,如下的话,会一直都是会进入if方法执行的(即使文档不存在)

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
     DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
    

    所以如果要想判断文档不存在的情况,则需要如下判断:

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
     DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
     if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
         System.out.println("id=" + deleteResponse.getId() + "的文档未找到,未执行删除!");
     }else {
         System.out.println("id=" + deleteResponse.getId() + "的文档删除成功");
     }
    }
    
  2. 操作要指定id, 不然会报

    org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: id is missing;

  3. 注意:bulk批量操作里是不允许执行get操作的,因为get操作和其它操作的参数是不同的,所以如下代码会报错:

    bulkRequest.add(new GetRequest("posts").id("22"));
    

    完整代码 ```java BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout(“10s”); //插入数据 List docList = new ArrayList<>(); // public Doc(String name, String description, String studymodel, Date timestamp, Double price) { docList.add(new Doc(“n-1”, “desc”, “asdas”, new Date(), 12.1)); docList.add(new Doc(“n-2”, “desc”, “fsdf”, new Date(), 12.1));

//批量处理请求 for (int i = 0; i < docList.size(); i++) { bulkRequest.add( new IndexRequest(“elasticsearch_test”) .id(“” + (i + 20)) //id自增 .source(JSON.toJSONString(docList.get(i)), XContentType.JSON) ); }

bulkRequest.add(new IndexRequest(“elasticsearch_test”).id(“57”).source(XContentType.JSON, “name”, “n-n2”, “desc”, “desc-n2”).opType(DocWriteRequest.OpType.CREATE)); bulkRequest.add(new IndexRequest(“elasticsearch_test”).id(“58”).source(XContentType.JSON, “name”, “n-n3”, “desc”, “desc-n3”));

bulkRequest.add(new DeleteRequest(“elasticsearch_test”).id(“50”)); bulkRequest.add(new UpdateRequest(“elasticsearch_test”, “20”).doc(XContentType.JSON, “desc”, “test”).fetchSource(true)); bulkRequest.add(new IndexRequest(“elasticsearch_test”).id(“21”).source(XContentType.JSON, “name”, “dasda”));

//设置主分片的超时时间 bulkRequest.timeout(TimeValue.timeValueMinutes(2)); bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);

try { BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT); // 有操作执行失败 System.out.println(bulkResponse.hasFailures());

for (BulkItemResponse bulkItemResponse : bulkResponse) {

    //有执行失败的
    if (bulkItemResponse.getFailure() != null) {
        BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
        System.out.println(failure.getCause());
        if (failure.getStatus() == RestStatus.BAD_REQUEST) {
            System.out.println("id=" + bulkItemResponse.getId() + "为非法的请求!");
            continue;
        }
    }
    DocWriteResponse itemResponse = bulkItemResponse.getResponse();

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
        if (bulkItemResponse.getFailure() != null && bulkItemResponse.getFailure().getStatus() == RestStatus.CONFLICT) {
            System.out.println("id=" + bulkItemResponse.getId() + "与现在文档冲突");
            continue;
        }
        IndexResponse indexResponse = (IndexResponse) itemResponse;
        System.out.println("id=" + indexResponse.getId() + "的文档创建成功");
        System.out.println("id=" + indexResponse.getId() + "文档操作类型:" + itemResponse.getResult());
    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
        System.out.println("id=" + updateResponse.getId() + "的文档更新成功");
        System.out.println("id=" + updateResponse.getId() + "文档内容为:" + updateResponse.getGetResult().sourceAsString());
    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
        if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
            System.out.println("id=" + deleteResponse.getId() + "的文档未找到,未执行删除!");
        } else {
            System.out.println("id=" + deleteResponse.getId() + "的文档删除成功");
        }
    }
}

} catch (Exception e) { e.printStackTrace(); }

<a name="CJPgs"></a>
# multiGet
BulkRequest是用来进行批量索引、更新、删除操作的请求对象。<br />批量查询的操作: Mult-Get Request

1. 使用主请求对象的add方法,将子查询对象加入到主查询中
```java
request.add(new MultiGetRequest.Item("index", "another_id"));
  1. 可以分别针对每一个子查询进行如下设置:
  • 查询的文档内容不返回:.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)

    request.add(new MultiGetRequest.Item("posts", "doc", "2").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
    
  • 指定查询哪些字段内容或过滤掉哪些字段

    String[] includes = new String[] {"user", "*r"};
    String[] excludes = Strings.EMPTY_ARRAY;
    FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
    request.add(new MultiGetRequest.Item("posts2", "3").fetchSourceContext(fetchSourceContext));
    
  • 分别指定查询的路由分片和版本等

    request.add(new MultiGetRequest.Item("posts", "doc", "with_routing")
                      .routing("some_routing"));
    request.add(new MultiGetRequest.Item("index", "type", "with_version")
                      .versionType(VersionType.EXTERNAL)
                      .version(10123L));
    

    注:以上设置无法在主请求中设置

  1. 对主请求设置

    preference, realtime and refresh 需要在主请求里设置,子请求中无法设置这些值

request.preference("some_preference");
// realtime的值默认为true
request.realtime(false);
request.refresh(true);
  1. 执行请求并获取结果

    MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);

  2. 对结果的处理及说明

  • 可以指定处理某条查询

    MultiGetItemResponse firstItem = response.getResponses()[0];

  • 遍历查询的结果

    for(MultiGetItemResponse item: response.getResponses()) {
      String index = item.getIndex();
      String id = item.getId();
      System.out.println("第" + ++count + "条-》index:" + index + "; id:" + id);
      if(item.getFailure() != null) {
          Exception e = item.getFailure().getFailure();
          ElasticsearchException ee = (ElasticsearchException) e;
          if(ee.getMessage().contains("reason=no such index")) {
              System.out.println("查询的文档库不存在!");
          }
      }
    
      GetResponse getResponse = item.getResponse();
    
      if (getResponse.isExists()) {
          long version = getResponse.getVersion();
          String sourceAsString = getResponse.getSourceAsString();
          System.out.println("查询的结果为:");
          System.out.println(sourceAsString);
          Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
          byte[] sourceAsBytes = getResponse.getSourceAsBytes();
      } else {
          System.out.println("没有查询到相应文档");
      }
    }
    

    特殊情况说明:
    1)查询的index索引库不存在时,则返回结果的failure参数不为null,含报错信息,
    GetResponse getResponse = item.getResponse() 得到的getResponse值为null
    故为了后面数据抛出空指针异常,要做兼容处理:

    if(item.getFailure() != null) {
     Exception e = item.getFailure().getFailure();
     ElasticsearchException ee = (ElasticsearchException) e;
     if(ee.getMessage().contains("reason=no such index")) {
        System.out.println("查询的文档库不存在!");
     }
    }
    

    2)如果文档类型type或id不存在,getResponse.isExists() 的结果是false

完整代码

// 创建查询父对象
MultiGetRequest request = new MultiGetRequest();
// 添加子查询
request.add(new MultiGetRequest.Item("posts", "1"));
request.add(new MultiGetRequest.Item("posts", "2").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
String[] includes = new String[]{"name", "desc*"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("posts2", "3").fetchSourceContext(fetchSourceContext));

// 针对每个子请求分别设置,无法在主请求中设置
// 指定去哪个分片上查询,如何指定分片上没有,不会再去其它分片查询,如果不指定,则依次轮询各个分片查询
request.add(new MultiGetRequest.Item("posts", "2")
            .routing("some_routing"));

request.add(new MultiGetRequest.Item("posts", "1")
            .versionType(VersionType.EXTERNAL)
            .version(1L));

// preference, realtime and refresh 需要在主请求里设置,子请求中无法设置
//偏好查询
request.preference("some_preference");
// realtime的值默认为true.是否实时执行
request.realtime(false);
request.refresh(true);
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
int count = 0;
for (MultiGetItemResponse item : response.getResponses()) {
    String index = item.getIndex();
    String id = item.getId();
    System.out.println("第" + ++count + "条-》index:" + index + "; id:" + id);
    if (item.getFailure() != null) {
        Exception e = item.getFailure().getFailure();
        ElasticsearchException ee = (ElasticsearchException) e;
        if (ee.getMessage().contains("reason=no such index")) {
            System.out.println("查询的文档库不存在!");
        }
    }

    GetResponse getResponse = item.getResponse();
    if (getResponse != null && getResponse.isExists()) {
        long version = getResponse.getVersion();
        String sourceAsString = getResponse.getSourceAsString();
        System.out.println("查询的结果为:");
        System.out.println(sourceAsString);
        Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
        byte[] sourceAsBytes = getResponse.getSourceAsBytes();
    } else {
        System.out.println("没有查询到相应文档");
    }
}

嵌套文档

PUT my-nested-000001
{
  "mappings": {
    "properties": {
      "parent": {
        "type": "nested" ,
        "properties": {
          "children":{
            "type": "nested"
          }
        }

      }
    }
  }
}

对应的Java代码
这里需要注意一点,一定要将对应的字段类型定义为nested

CreateIndexRequest request = new CreateIndexRequest(indexName);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.startObject("properties");
    {
        builder.startObject("parent");
        {
            builder.field("type", "nested");
        }
        builder.endObject();
    }
    builder.endObject();
}
builder.endObject();
request.mapping(builder);
client.indices().create(request, RequestOptions.DEFAULT);

插入文档

Person zhang = new Person();
zhang.setFirstName("zhang");
zhang.setSecondName("xiao");
Person fa= new Person();
fa.setFirstName("zhang");
fa.setSecondName("fa");
Person ma= new Person();
ma.setFirstName("zheng");
ma.setSecondName("maa");
List<Person> parents = new ArrayList<>();
parents.add(fa);
parents.add(ma);
zhang.setParent(parents);
IndexRequest indexRequest = new IndexRequest(indexName).source(JSON.toJSONString(zhang), XContentType.JSON)
    //if do not set IMMEDIATE, last step will return nothing
    .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
IndexResponse out= client.index(indexRequest, RequestOptions.DEFAULT);

查询文档
如果需要根据嵌套的字段查询,也不能简单的通过query进行,需要通过特定的nested方法,

GET my-index-000001/_search
{
  "query": {
    "nested": {
      "path": "parent",
      "query": {
        "bool": {
          "must": [
            { "match": { "user.first": "Alice" }},
            { "match": { "user.last":  "Smith" }} 
          ]
        }
      }
    }
  }
}

该方法需要一个参数path,对应为嵌套的文档字段,里面可以使用各种常规查询
Java实现如下

SearchRequest search= new SearchRequest(indexName);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
NestedQueryBuilder nested=QueryBuilders.nestedQuery("parent",QueryBuilders.matchQuery("parent.firstName", "zhang"),ScoreMode.None);
searchSourceBuilder.query(nested);
search.source(searchSourceBuilder);
SearchResponse response=client.search(search, RequestOptions.DEFAULT);
System.out.println(response.toString());
response.getHits().forEach(hi ->{
    System.out.println(hi.getSourceAsString());
});