IndexAPI

Index Request

一个IndexRequest 需要以下参数:

  1. IndexRequest indexRequest = new IndexRequest(
  2. "posts", // 索引名
  3. "doc", // 类型
  4. "1"); // 文档id
  5. // 文档数据
  6. String jsonString = "{" +
  7. "\"user\":\"kimchy\"," +
  8. "\"postDate\":\"2013-01-30\"," +
  9. "\"message\":\"trying out Elasticsearch\"" +
  10. "}";
  11. // 设置数据源和类型
  12. indexRequest.source(jsonString, XContentType.JSON);

提供文档数据源

除了上面显示的字符串示例外,还可以以不同的方式提供文档源:

  1. Map<String, Object> jsonMap = new HashMap<>();
  2. jsonMap.put("user", "kimchy");
  3. jsonMap.put("postDate", new Date());
  4. jsonMap.put("message", "trying out Elasticsearch");
  5. IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
  6. .source(jsonMap); // (1)

(1)提供一个map,这将会自动转换为json格式。

  1. XContentBuilder builder = XContentFactory.jsonBuilder();
  2. builder.startObject();
  3. {
  4. builder.field("user", "kimchy");
  5. builder.timeField("postDate", new Date());
  6. builder.field("message", "trying out Elasticsearch");
  7. }
  8. builder.endObject();
  9. IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
  10. .source(builder); // (1)

(1)提供一个XContentBuilder,这会在内部自动转为json格式内容。

  1. IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
  2. .source("user", "kimchy",
  3. "postDate", new Date(),
  4. "message", "trying out Elasticsearch"); // (1)

(1)直接提供键值对,这会在内部自动转为json格式内容。

可选参数

以下的参数时可选的:

  1. indexRequest.routing("routing"); // 设置分片路由
  2. indexRequest.parent("parent"); // 设置父文档id
  3. indexRequest.timeout(TimeValue.timeValueSeconds(1)); // 等待主分片可用的超时时间, TimeValue形式
  4. indexRequest.timeout("1s"); // 等待主分片可用的超时时间, String形式
  5. indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // 刷新策略
  6. indexRequest.setRefreshPolicy("wait_for"); // 刷新策略
  7. indexRequest.version(2); // 版本
  8. indexRequest.versionType(VersionType.EXTERNAL); // 版本类型
  9. indexRequest.opType(DocWriteRequest.OpType.CREATE); // 操作类型
  10. indexRequest.opType("create"); // 操作类型
  11. indexRequest.setPipeline("pipeline"); // 管道名称

同步执行

当使用以下方式发送indexRequest,客户端调用线程将会阻塞直到返回IndexResponse对象:

  1. IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);

异步执行

也可以通过异步的方式执行indexRequest,调用者需要传递一个监听器来处理异步调用返回的Response。

  1. client.indexAsync(request, RequestOptions.DEFAULT, listener);

异步方法不会阻塞,会立即返回。一旦返回结果,如果成功则会回调ActionListener的onResponse方法,如果调用失败,则会回调onFailure方法。
典型的listener 如下:

  1. ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
  2. @Override
  3. public void onResponse(IndexResponse indexResponse) {
  4. // 成功
  5. }
  6. @Override
  7. public void onFailure(Exception e) {
  8. // 失败
  9. }
  10. };

Index Response

从返回的IndexResponse中可以获取执行的操作的信息:

  1. String index = indexResponse.getIndex(); // 获得索引
  2. String type = indexResponse.getType(); // 获得类型
  3. String id = indexResponse.getId(); // 获得文档id
  4. long version = indexResponse.getVersion(); //获得版本
  5. if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
  6. // 当文档第一次创建后处理
  7. } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
  8. // 当文档已存在,更新后处理
  9. }
  10. ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); // 获取分片信息
  11. if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
  12. // 当处理成功的芬片数量不等于总数量时处理
  13. }
  14. if (shardInfo.getFailed() > 0) {
  15. for (ReplicationResponse.ShardInfo.Failure failure :
  16. shardInfo.getFailures()) {
  17. String reason = failure.reason(); // 获取失败原因,并处理
  18. }
  19. }

如果发生了版本冲突,则会抛出ElasticsearchException 异常:

  1. IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
  2. .source("field", "value")
  3. .version(1);
  4. try {
  5. IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
  6. } catch(ElasticsearchException e) {
  7. if (e.status() == RestStatus.CONFLICT) {
  8. // 处理抛出的异常,版本冲突
  9. }
  10. }

如果操作类型被设置成create,但是文档已经存在,同样会抛出异常:

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .opType(DocWriteRequest.OpType.CREATE);
try {
    IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        // 处理抛出的异常,冲突
    }
}

Get API

获取文档

Get Request

一个GetRequest对象需要以下参数:

GetRequest getRequest = new GetRequest(
        "posts", // 索引
        "doc",   // 类型
        "1");    // 文档id

可选参数

下面的参数是可选的

getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE); // 禁用数据源检索

String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
        new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext); // 配置特定字段的检索

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext =
        new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext); // 配置特定字段的检索

getRequest.storedFields("message"); // 为特定存储字段配置检索(要求字段单独存储在映射中)
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
String message = getResponse.getField("message").getValue(); // 检索存储的message字段(要求字段单独存储在映射中)

getRequest.routing("routing"); // 设置分片路由

getRequest.parent("parent"); // 设置父文档id

getRequest.preference("preference"); // 设置偏向

getRequest.realtime(false); // 将实时标志设置为false(默认为true)

getRequest.refresh(true); // 在检索文档之前执行一次刷新(默认为false)

getRequest.version(2); // 设置版本

getRequest.versionType(VersionType.EXTERNAL); // 设置版本类型

同步执行

同步调用,会阻塞。

GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

异步执行

异步执行不阻塞,需要提供监听器。

client.getAsync(getRequest, RequestOptions.DEFAULT, listener);

异步方法不会阻塞,会立即返回。一旦返回结果,如果成功则会回调ActionListener的onResponse方法,如果调用失败,则会回调onFailure方法。
典型的listener 如下:

ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
    @Override
    public void onResponse(GetResponse getResponse) {
        // 成功
    }

    @Override
    public void onFailure(Exception e) {
        // 失败
    }
};

Get Response

返回的GetResponse对象,允许检索请求的文档及其元数据和最终存储的字段。

String index = getResponse.getIndex(); // 获的索引
String type = getResponse.getType();   // 获得类型
String id = getResponse.getId();       // 获得文档id
if (getResponse.isExists()) {
    long version = getResponse.getVersion(); // 获得版本
    String sourceAsString = getResponse.getSourceAsString(); // 检索文档并生成字符串     
    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // 检索文档并生成map
    byte[] sourceAsBytes = getResponse.getSourceAsBytes(); // 检索文档并生成byte数组
} else {
    // 没有找到文档,isExists返回false
}

当对不存在的索引执行get请求时,响应返回404状态码,抛出一个ElasticsearchException,需要按如下方式处理:

GetRequest getRequest = new GetRequest("does_not_exist", "doc", "1");
try {
    GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.NOT_FOUND) {
        // 索引不存在,抛出异常
    }
}

如果请求的是特定的文档版本,而与现有文档的版本号不同,则会引发版本冲突:

try {
    GetRequest getRequest = new GetRequest("posts", "doc", "1").version(2);
    GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        // 版本发生冲突
    }
}

Exists API

Exists API用于查看文档是否存在,存在返回true,不存在返回false。

Exists Request

它使用GetRequest。因为exists()只会返回true或者false,建议关闭查询_source和任何存储字段,这样请求会变的轻量许多:

GetRequest getRequest = new GetRequest(
    "posts", // 索引
    "doc",   // 类型
    "1");    // 文档id
getRequest.fetchSourceContext(new FetchSourceContext(false)); // 关闭检索_source
getRequest.storedFields("_none_"); // 关闭检索存储的字段

同步执行

当使用同步执行方式,客户端调用线程会阻塞直到函数返回布尔值:

boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);

异步执行

异步执行不阻塞,需要提供监听器。

client.existsAsync(getRequest, RequestOptions.DEFAULT, listener);

异步方法不会阻塞,会立即返回。一旦返回结果,如果成功则会回调ActionListener的onResponse方法,如果调用失败,则会回调onFailure方法。
典型的listener 如下:

ActionListener<Boolean> listener = new ActionListener<Boolean>() {
    @Override
    public void onResponse(Boolean exists) {
        // 成功返回布尔值,true或false
    }

    @Override
    public void onFailure(Exception e) {
        // 失败
    }
};

Delete API

Delete API用于删除文档。

Delete Request

需要一个DeleteRequest对象:

DeleteRequest deleteRequest = new DeleteRequest(
        "posts", // 索引
        "doc",   // 类型
        "1");    // 文档id

可选参数

以下是一些可选参数:

deleteRequest.routing("routing"); // 设置分片路由

deleteRequest.parent("parent"); // 设置父文档id

deleteRequest.timeout(TimeValue.timeValueMinutes(2)); // 等待主分片可访问的超时时间
deleteRequest.timeout("2m");                          // 等待主分片可访问的超时时间

deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // 设置刷新策略
deleteRequest.setRefreshPolicy("wait_for");                            // 设置刷新策略

deleteRequest.version(2); // 设置版本

deleteRequest.versionType(VersionType.EXTERNAL); // 设置版本类型

同步执行

同步调用,会阻塞,直到返回DeleteResponse对象:

DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);

异步执行

异步执行不阻塞,需要提供监听器。

client.deleteAsync(deleteRequest, RequestOptions.DEFAULT, listener);

异步方法不会阻塞,会立即返回。一旦返回结果,如果成功则会回调ActionListener的onResponse方法,如果调用失败,则会回调onFailure方法。
典型的listener 如下:

listener = new ActionListener<DeleteResponse>() {
    @Override
    public void onResponse(DeleteResponse deleteResponse) {
        // 成功
    }

    @Override
    public void onFailure(Exception e) {
        // 失败
    }
};

Delete Response

可以从返回的DeleteResponse对象中获得一些执行信息:

String index = deleteResponse.getIndex(); // 获得索引
String type = deleteResponse.getType();   // 获得类型
String id = deleteResponse.getId();       // 获得文档id
long version = deleteResponse.getVersion(); // 获得版本
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); // 获得分片信息
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 当成功的分片数量小于总数量时,执行的操作
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure :
            shardInfo.getFailures()) {
        String reason = failure.reason(); // 获取失败原因
    }
}

也可以很容易的判断文档是否存在:

DeleteRequest deleteRequest = new DeleteRequest("posts", "doc", "does_not_exist");
DeleteResponse deleteResponse = client.delete(
        deleteRequest, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
    // 未检索到文档
}

如果发生了版本冲突,会抛出ElasticsearchException 异常:

try {
    DeleteResponse deleteResponse = client.delete(
            new DeleteRequest("posts", "doc", "1").version(2),
            RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        // 版本冲突
    }
}

Update API

Update API用于更新文档。

Update Request

需要提供一个UpdateRequest对象:
Update API允许你通过脚本或者传递一个文档的部分信息来更新这个已经存在的文档。

UpdateRequest updateRequest = new UpdateRequest(
        "posts", // 索引
        "doc",   // 类型
        "1");    // 文档id

通过脚本更新

可以提供一个内联脚本:

Map<String, Object> parameters = singletonMap("count", 4); // 用map保存脚本中的参数

Script inline = new Script(ScriptType.INLINE, "painless",
        "ctx._source.field += params.count", parameters);  // 创建内联脚本
updateRequest.script(inline);  // 应用到updateRequest中

或者,也可以提供一个存储脚本:

Script stored = new Script(
        ScriptType.STORED, null, "increment-field", parameters); // 创建脚本
updateRequest.script(stored); // 应用到updateRequest中

通过传递文档的部分内容来更新

当传递一个文档的部分内容时候,这些内容将会与已存在的文档进行合并。
可以以不同的形式传递文档内容:

UpdateRequest updateRequest = new UpdateRequest("posts", "doc", "1");
String jsonString = "{" +
        "\"updated\":\"2017-01-01\"," +
        "\"reason\":\"daily update\"" +
        "}";
updateRequest.doc(jsonString, XContentType.JSON); // (1)

(1)传递json字符串。

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest updateRequest = new UpdateRequest("posts", "doc", "1")
        .doc(jsonMap); // (1)

(1)传递map,这在内部会自动转换成json格式。

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.timeField("updated", new Date());
    builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest updateRequest = new UpdateRequest("posts", "doc", "1")
        .doc(builder); // (1)

(1)传递一个XContentBuilder对象,这在内部会自动转换成json格式。

UpdateRequest updateRequest = new UpdateRequest("posts", "doc", "1")
         .doc("updated", new Date(),
              "reason", "daily update"); // (1)

(1)传递键值对。

Upsert

如果文档不存在,可以使用upsert函数传递一个文档内容,这将会插入新的文档:

String jsonString = "{\"created\":\"2017-01-01\"}";
updateRequest.upsert(jsonString, XContentType.JSON); // 传递一个json字符串

可选参数

下面是一些可选参数:

updateRequest.routing("routing"); // 设置分片路由

updateRequest.parent("parent"); // 设置父文档id

updateRequest.timeout(TimeValue.timeValueSeconds(1)); // 设置等待主分片可检索的超时时间
updateRequest.timeout("1s");                          // 设置等待主分片可检索的超时时间

updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // 设置刷新策略
updateRequest.setRefreshPolicy("wait_for");                            // 设置刷新策略

updateRequest.retryOnConflict(3); // 发生冲突时重试update的次数

updateRequest.fetchSource(true); // 启用检索_source, 默认是禁用的

String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
updateRequest.fetchSource(
        new FetchSourceContext(true, includes, excludes)); // 配置检索_source时检索的字段

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
updateRequest.fetchSource(
        new FetchSourceContext(true, includes, excludes)); // 配置检索_source时不检索的字段

updateRequest.version(2); // 设置版本

updateRequest.detectNoop(false); // 禁用noop探测

updateRequest.scriptedUpsert(true); // 指示无论文档是否存在,脚本都必须运行(即如果文档不存在,脚本负责创建文档)

updateRequest.docAsUpsert(true); // 指定如果文档不存在,则使用upsert模式创建文档

updateRequest.waitForActiveShards(2); // 设置在执行update操作之前必须存在的活动分片数量
updateRequest.waitForActiveShards(ActiveShardCount.ALL); // 设置在执行update操作之前必须存在的活动分片数量

同步执行

当执行同步操作时,客户端线程阻塞知道返回UpdateResponse对象:

UpdateResponse updateResponse = client.update(
        updateRequest, RequestOptions.DEFAULT);

异步执行

异步执行不阻塞,需要提供监听器。

client.updateAsync(updateRequest, RequestOptions.DEFAULT, listener);

异步方法不会阻塞,会立即返回。一旦返回结果,如果成功则会回调ActionListener的onResponse方法,如果调用失败,则会回调onFailure方法。
典型的listener 如下:

listener = new ActionListener<UpdateResponse>() {
    @Override
    public void onResponse(UpdateResponse updateResponse) {
        // 成功
    }

    @Override
    public void onFailure(Exception e) {
        // 失败
    }
};

Update Response

可以通过返回的UpdateResponse对象获取操作信息:

String index = updateResponse.getIndex(); // 获取索引
String type = updateResponse.getType();   // 获取类型
String id = updateResponse.getId();       // 获取文档id
long version = updateResponse.getVersion(); // 获取版本
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 如果是执行了upsert,文档被第一次创建
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 文档被更新了
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
    // 文档被删除了
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
    // 处理文档不受更新影响的情况(例如,没有对文档执行任何操作(noop))
}

当开启了对_source的检索,返回的UpdateResponse对象中会包含更新的文档的源信息:

GetResult result = updateResponse.getGetResult(); // 检索更新的文档
if (result.isExists()) {
    String sourceAsString = result.sourceAsString(); // 转换成json字符串
    Map<String, Object> sourceAsMap = result.sourceAsMap(); // 转换成map
    byte[] sourceAsBytes = result.source(); // 转换成byte数组
} else {
    // 处理文档源不在响应中的情况(默认)
}

也可以检查分片失败信息:

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo(); // 获得分片信息
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 分片成功数量小于总数量时
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure :
            shardInfo.getFailures()) {
        String reason = failure.reason(); // 获取分片失败原因
    }
}

当对不存在的文档执行update操作,响应中会包含一个404状态码,并且会抛出ElasticsearchException 异常:

UpdateRequest updateRequest = new UpdateRequest("posts", "type", "does_not_exist")
        .doc("field", "value");
try {
    UpdateResponse updateResponse = client.update(
            updateRequest, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.NOT_FOUND) {
        // 文档不存在时,抛404异常
    }
}

如果有版本冲突,也会抛出ElasticsearchException 异常:

UpdateRequest updateRequest = new UpdateRequest("posts", "doc", "1")
        .doc("field", "value")
        .version(1);
try {
    UpdateResponse updateResponse = client.update(
            updateRequest, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        // 发生了版本冲突
    }
}

Bulk API

Bulk API用于提供批处理的功能。
注意:
Java High Level REST Client提供了一个Bulk Processor(批处理对象)来帮助执行批处理,简化了Bulk API的操作。
下面先来了解一下Bulk API的内容。

Bulk Request

一个BulkRequest对象可以被用来在一次请求中执行批量的索引、更新、删除操作。需要在BulkRequest中添加至少一个操作:

BulkRequest bulkRequest = new BulkRequest(); // 
bulkRequest.add(new IndexRequest("posts", "doc", "1")  
        .source(XContentType.JSON,"field", "foo"));
bulkRequest.add(new IndexRequest("posts", "doc", "2")  
        .source(XContentType.JSON,"field", "bar"));
bulkRequest.add(new IndexRequest("posts", "doc", "3")  
        .source(XContentType.JSON,"field", "baz"));

上边的操作中,在一个BulkRequest对象中添加了三个IndexRequest对象。
同一个BulkRequest对象中添加不同类型的操作对象也是可以的:

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new DeleteRequest("posts", "doc", "3")); 
bulkRequest.add(new UpdateRequest("posts", "doc", "2") 
        .doc(XContentType.JSON,"other", "test"));
bulkRequest.add(new IndexRequest("posts", "doc", "4")  
        .source(XContentType.JSON,"field", "baz"));

可选参数

下面是可选参数:

bulkRequest.timeout(TimeValue.timeValueMinutes(2)); // 设置等待批量操作完成的超时时间
bulkRequest.timeout("2m");                          // 设置等待批量操作完成的超时时间 

bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // 设置刷新策略
bulkRequest.setRefreshPolicy("wait_for");                            // 设置刷新策略

bulkRequest.waitForActiveShards(2);                    // 设置执行操作前必须有多少活动的分片
bulkRequest.waitForActiveShards(ActiveShardCount.ALL); // 设置执行操作前必须有多少活动的分片

同步执行

BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);

异步执行

异步执行需要提供监听器:

client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);

异步方法不会阻塞,会立即返回。一旦返回结果,如果成功则会回调ActionListener的onResponse方法,如果调用失败,则会回调onFailure方法。
典型的listener 如下:

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        // 成功
    }

    @Override
    public void onFailure(Exception e) {
        // 失败
    }
};

Bulk Response

可以通过返回的BulkResponse对象获取执行的信息:

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
        IndexResponse indexResponse = (IndexResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { 
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

BulkResponse对象提供了一个方法,可以帮助快速确定是否有操作执行失败:

if (bulkResponse.hasFailures()) { 
    // 有操作执行失败
}

如果有失败的情况,需要遍历所有的BulkItemResponse对象,找到失败的那些:

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    if (bulkItemResponse.isFailed()) { 
        BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); 

    }
}

Bulk Processor

BulkProcessor通过提供一个实用程序类来简化Bulk API的使用,它允许索引/更新/删除操作。
为了执行请求,BulkProcessor需要以下组件:

  • RestHighLevelClient这个客户端是用来执行BulkRequest,并获得BulkResponse的。
  • BulkProcessor.Listener这个监听器会在BulkRequest执行前后或者失败时触发。

BulkProcessor.builder可以用来创建一个新的BulkProcessor对象:

BulkProcessor.Listener listener = new BulkProcessor.Listener() { // (1)
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        // (2)
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        // (3)
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        // (4)
    }
};

BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
        (request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build(); // (5)

(1)创建一个BulkProcessor.Listener对象。 (2)该方法在每个BulkRequest对象执行前调用。 (3)该方法在每个BulkRequest对象执行后调用。 (4)该方法在执行BulkRequest失败后调用。 (5)通过调用build()方法创建一个新的BulkProcessor对象。将在幕后通过RestHighLevelClient.bulkAsync()执行BulkRequest。

BulkProcessor.Builder提供了方法用于配置BulkProcessor将如何执行请求:

BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
        (request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(500); // (1)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // (2)
builder.setConcurrentRequests(0); // (3)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // (4)
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // (5)

(1)设置达到多少次请求后触发一次批量请求(默认是1000,可以通过设置为-1来禁用)。 (2)设置到达多少数据量是批量处理一次(默认是5MB,可以设置为-1来禁用)。 (3)设置单次批量处理允许的并发请求个数(默认是1,设置为0只允许执行单个请求)。 (4) 设置刷新间隔,如果到达时间点,所有等待的BulkRequest都会被flush(默认未开启)。 (5)设置一个初始化为等待一秒,尝试3次的后退策略。

一旦BulkProcessor被创建,就可以添加请求:

IndexRequest one = new IndexRequest("posts", "doc", "1").
        source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
        .source(XContentType.JSON, "title",
                "The Future of Federated Search in Elasticsearch");

// 添加3个请求
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

请求将会被BulkProcessor执行,在BulkRequest被调用时会触发BulkProcessor.Listener的方法:

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); 
        logger.debug("Executing bulk [{}] with {} requests",
                executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        if (response.hasFailures()) { 
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds",
                    executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.error("Failed to execute bulk", failure); 
    }
};

当所有请求都被添加到BulkProcessor中之后,我们怎么关闭它呢?
可以通过调用awaitClose()方法设定等待所有请求处理完毕或者等待指定时间后关闭:

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); // (1)

(1)当所有请求都处理完毕,该方法返回true,当等待了指定时间而请求还没处理完毕,该方法返回false。

还可以通过调用close()方法,立即关闭BulkProcessor:

bulkProcessor.close();

这两个方法都会在关闭BulkProcessor之前flush所有请求,并且禁止添加新的请求。