This connector provides sinks that can request document actions to an Elasticsearch Index. To use this connector, add one of the following dependencies to your project, depending on the version of the Elasticsearch installation:
该连接器提供了可向Elasticsearch Index 请求文档操作的接收器。要使用此连接器,请根据您的Elasticsearch安装版本将以下依赖项之一添加到您的项目中:

Maven Dependency Supported since Elasticsearch version
flink-connector-elasticsearch_2.11 1.0.0 1.x
flink-connector-elasticsearch2_2.11 1.0.0 2.x
flink-connector-elasticsearch5_2.11 1.3.0 5.x
flink-connector-elasticsearch6_2.11 1.6.0 6 and later versions

Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.
请注意,流连接器当前不是二进制分发的一部分。有关如何将程序与库一起打包以执行群集的信息,请参见此处。

Installing Elasticsearch

Instructions for setting up an Elasticsearch cluster can be found here. Make sure to set and remember a cluster name. This must be set when creating an ElasticsearchSink for requesting document actions against your cluster.
可以在此处找到有关设置Elasticsearch集群的说明。确保设置并记住集群名称。在ElasticsearchSink针对集群创建用于请求文档操作的时必须设置此选项。

Elasticsearch Sink

The ElasticsearchSink uses a TransportClient (before 6.x) or RestHighLevelClient (starting with 6.x) to communicate with an Elasticsearch cluster.
的ElasticsearchSink使用TransportClient(前6.x的)或RestHighLevelClient(与6.x的开始)与Elasticsearch集群通信。

The example below shows how to configure and create a sink:
下面的示例显示如何配置和创建接收器:

  1. import org.apache.flink.api.common.functions.RuntimeContext;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
  4. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  5. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  6. import org.elasticsearch.action.index.IndexRequest;
  7. import org.elasticsearch.client.Requests;
  8. import org.elasticsearch.common.transport.InetSocketTransportAddress;
  9. import org.elasticsearch.common.transport.TransportAddress;
  10. import java.net.InetAddress;
  11. import java.util.ArrayList;
  12. import java.util.HashMap;
  13. import java.util.List;
  14. import java.util.Map;
  15. DataStream<String> input = ...;
  16. Map<String, String> config = new HashMap<>();
  17. config.put("cluster.name", "my-cluster-name");
  18. // This instructs the sink to emit after every element, otherwise they would be buffered
  19. config.put("bulk.flush.max.actions", "1");
  20. List<TransportAddress> transportAddresses = new ArrayList<String>();
  21. transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
  22. transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
  23. input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
  24. public IndexRequest createIndexRequest(String element) {
  25. Map<String, String> json = new HashMap<>();
  26. json.put("data", element);
  27. return Requests.indexRequest()
  28. .index("my-index")
  29. .type("my-type")
  30. .source(json);
  31. }
  32. @Override
  33. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  34. indexer.add(createIndexRequest(element));
  35. }
  36. }));
  1. import org.apache.flink.api.common.functions.RuntimeContext;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  5. import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
  6. import org.elasticsearch.action.index.IndexRequest;
  7. import org.elasticsearch.client.Requests;
  8. import java.net.InetAddress;
  9. import java.net.InetSocketAddress;
  10. import java.util.ArrayList;
  11. import java.util.HashMap;
  12. import java.util.List;
  13. import java.util.Map;
  14. DataStream<String> input = ...;
  15. Map<String, String> config = new HashMap<>();
  16. config.put("cluster.name", "my-cluster-name");
  17. // This instructs the sink to emit after every element, otherwise they would be buffered
  18. config.put("bulk.flush.max.actions", "1");
  19. List<InetSocketAddress> transportAddresses = new ArrayList<>();
  20. transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
  21. transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
  22. input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
  23. public IndexRequest createIndexRequest(String element) {
  24. Map<String, String> json = new HashMap<>();
  25. json.put("data", element);
  26. return Requests.indexRequest()
  27. .index("my-index")
  28. .type("my-type")
  29. .source(json);
  30. }
  31. @Override
  32. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  33. indexer.add(createIndexRequest(element));
  34. }
  35. }));
  1. import org.apache.flink.api.common.functions.RuntimeContext;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  5. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
  6. import org.apache.http.HttpHost;
  7. import org.elasticsearch.action.index.IndexRequest;
  8. import org.elasticsearch.client.Requests;
  9. import java.util.ArrayList;
  10. import java.util.HashMap;
  11. import java.util.List;
  12. import java.util.Map;
  13. DataStream<String> input = ...;
  14. List<HttpHost> httpHosts = new ArrayList<>();
  15. httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
  16. httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
  17. // use a ElasticsearchSink.Builder to create an ElasticsearchSink
  18. ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
  19. httpHosts,
  20. new ElasticsearchSinkFunction<String>() {
  21. public IndexRequest createIndexRequest(String element) {
  22. Map<String, String> json = new HashMap<>();
  23. json.put("data", element);
  24. return Requests.indexRequest()
  25. .index("my-index")
  26. .type("my-type")
  27. .source(json);
  28. }
  29. @Override
  30. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  31. indexer.add(createIndexRequest(element));
  32. }
  33. }
  34. );
  35. // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
  36. esSinkBuilder.setBulkFlushMaxActions(1);
  37. // provide a RestClientFactory for custom configuration on the internally created REST client
  38. esSinkBuilder.setRestClientFactory(
  39. restClientBuilder -> {
  40. restClientBuilder.setDefaultHeaders(...)
  41. restClientBuilder.setMaxRetryTimeoutMillis(...)
  42. restClientBuilder.setPathPrefix(...)
  43. restClientBuilder.setHttpClientConfigCallback(...)
  44. }
  45. );
  46. // finally, build and add the sink to the job's pipeline
  47. input.addSink(esSinkBuilder.build());
  1. import org.apache.flink.api.common.functions.RuntimeContext
  2. import org.apache.flink.streaming.api.datastream.DataStream
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink
  4. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
  5. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
  6. import org.elasticsearch.action.index.IndexRequest
  7. import org.elasticsearch.client.Requests
  8. import org.elasticsearch.common.transport.InetSocketTransportAddress
  9. import org.elasticsearch.common.transport.TransportAddress
  10. import java.net.InetAddress
  11. import java.util.ArrayList
  12. import java.util.HashMap
  13. import java.util.List
  14. import java.util.Map
  15. val input: DataStream[String] = ...
  16. val config = new java.util.HashMap[String, String]
  17. config.put("cluster.name", "my-cluster-name")
  18. // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1")
  19. val transportAddresses = new java.util.ArrayList[TransportAddress]
  20. transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
  21. transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))
  22. input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
  23. def createIndexRequest(element: String): IndexRequest = {
  24. val json = new java.util.HashMap[String, String]
  25. json.put("data", element)
  26. return Requests.indexRequest()
  27. .index("my-index")
  28. .type("my-type")
  29. .source(json)
  30. }
  31. }))
  1. import org.apache.flink.api.common.functions.RuntimeContext
  2. import org.apache.flink.streaming.api.datastream.DataStream
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
  5. import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
  6. import org.elasticsearch.action.index.IndexRequest
  7. import org.elasticsearch.client.Requests
  8. import java.net.InetAddress
  9. import java.net.InetSocketAddress
  10. import java.util.ArrayList
  11. import java.util.HashMap
  12. import java.util.List
  13. import java.util.Map
  14. val input: DataStream[String] = ...
  15. val config = new java.util.HashMap[String, String]
  16. config.put("cluster.name", "my-cluster-name")
  17. // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1")
  18. val transportAddresses = new java.util.ArrayList[InetSocketAddress]
  19. transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
  20. transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
  21. input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
  22. def createIndexRequest(element: String): IndexRequest = {
  23. val json = new java.util.HashMap[String, String]
  24. json.put("data", element)
  25. return Requests.indexRequest()
  26. .index("my-index")
  27. .type("my-type")
  28. .source(json)
  29. }
  30. }))
  1. import org.apache.flink.api.common.functions.RuntimeContext
  2. import org.apache.flink.streaming.api.datastream.DataStream
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
  5. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
  6. import org.apache.http.HttpHost
  7. import org.elasticsearch.action.index.IndexRequest
  8. import org.elasticsearch.client.Requests
  9. import java.util.ArrayList
  10. import java.util.List
  11. val input: DataStream[String] = ...
  12. val httpHosts = new java.util.ArrayList[HttpHost]
  13. httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
  14. httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
  15. val esSinkBuilder = new ElasticsearchSink.Builer[String](ca9c0a35a760d06292a3ecb24d692c09): IndexRequest = {
  16. val json = new java.util.HashMap[String, String]
  17. json.put("data", element)
  18. return Requests.indexRequest()
  19. .index("my-index")
  20. .type("my-type")
  21. .source(json)
  22. }
  23. }
  24. )
  25. // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1)
  26. // provide a RestClientFactory for custom configuration on the internally created REST client esSinkBuilder.setRestClientFactory(
  27. restClientBuilder -> {
  28. restClientBuilder.setDefaultHeaders(...)
  29. restClientBuilder.setMaxRetryTimeoutMillis(...)
  30. restClientBuilder.setPathPrefix(...)
  31. restClientBuilder.setHttpClientConfigCallback(...)
  32. }
  33. )
  34. // finally, build and add the sink to the job's pipeline input.addSink(esSinkBuilder.build)

For Elasticsearch versions that still uses the now deprecated TransportClient to communicate with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a Map of Strings is used to configure the ElasticsearchSink. This config map will be directly forwarded when creating the internally used TransportClient. The configuration keys are documented in the Elasticsearch documentation here. Especially important is the cluster.name parameter that must correspond to the name of your cluster.
对于仍然使用现在不推荐使用的Elasticsearch版本TransportClient与Elasticsearch集群通信(即等于或低于5.x的版本),请注意的Mapof String用来配置ElasticsearchSink。创建内部使用的时,将直接转发此配置映射TransportClient。配置密钥记录在此处的Elasticsearch文档中。特别重要的是cluster.name必须与集群名称相对应的参数。

For Elasticsearch 6.x and above, internally, the RestHighLevelClient is used for cluster communication. By default, the connector uses the default configurations for the REST client. To have custom configuration for the REST client, users can provide a RestClientFactory implementation when setting up the ElasticsearchClient.Builder that builds the sink.
对于Elasticsearch 6.x及更高版本,内部RestHighLevelClient使用进行集群通信。默认情况下,连接器使用REST客户端的默认配置。要为REST客户端进行自定义配置,用户可以RestClientFactory在设置ElasticsearchClient.Builder构建接收器的时提供实现。

Also note that the example only demonstrates performing a single index request for each incoming element. Generally, the ElasticsearchSinkFunction can be used to perform multiple requests of different types (ex., DeleteRequest, UpdateRequest, etc.).
还要注意,该示例仅演示了对每个传入元素执行单个索引请求。一般地,ElasticsearchSinkFunction可用于执行不同类型的多个请求(例如,DeleteRequest,UpdateRequest等等)。

Internally, each parallel instance of the Flink Elasticsearch Sink uses a BulkProcessor to send action requests to the cluster. This will buffer elements before sending them in bulk to the cluster. The BulkProcessor executes bulk requests one at a time, i.e. there will be no two concurrent flushes of the buffered actions in progress.
在内部,Flink Elasticsearch Sink的每个并行实例都使用a BulkProcessor向集群发送操作请求。这将缓冲元素,然后将它们批量发送到集群。在BulkProcessor执行批量传输请求一次一个,即会出现在正在进行的缓冲动作没有两个并发刷新。

Elasticsearch Sinks and Fault Tolerance 接收器和容错

With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees at-least-once delivery of action requests to Elasticsearch clusters. It does so by waiting for all pending action requests in the BulkProcessor at the time of checkpoints. This effectively assures that all requests before the checkpoint was triggered have been successfully acknowledged by Elasticsearch, before proceeding to process more records sent to the sink.
启用Flink的检查点后,Flink Elasticsearch Sink可以确保将动作请求至少一次传递给Elasticsearch集群。通过BulkProcessor在检查点时等待所有待处理的操作请求来完成此操作。这有效地确保在继续处理发送到接收器的更多记录之前,Elasticsearch成功确认了触发检查点之前的所有请求。

More details on checkpoints and fault tolerance are in the fault tolerance docs.
有关检查点和容错的更多详细信息,请参见容错文档。

To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
要使用容错的Elasticsearch Sink,需要在执行环境中启用拓扑检查点:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.enableCheckpointing(5000); // checkpoint every 5000 msecs
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.enableCheckpointing(5000) // checkpoint every 5000 msecs

NOTE: Users can disable flushing if they wish to do so, by calling disableFlushOnCheckpoint() on the created ElasticsearchSink. Be aware that this essentially means the sink will not provide any strong delivery guarantees anymore, even with checkpoint for the topology enabled.
注意:用户可以通过在创建的ElasticsearchSink上调用disableFlushOnCheckpoint()来禁用刷新。请注意,这实质上意味着即使启用了拓扑检查点,接收器也将不再提供任何强大的交付保证。

Communication using Embedded Node (only for Elasticsearch 1.x) 使用嵌入式节点进行通信

For Elasticsearch versions 1.x, communication using an embedded node is also supported. See here for information about the differences between communicating with Elasticsearch with an embedded node and a TransportClient.
对于Elasticsearch 1.x版,还支持使用嵌入式节点的通信。请参阅此处,以获取有关通过嵌入式节点与Elasticsearch进行通信和a之间的区别的信息TransportClient。

Below is an example of how to create an ElasticsearchSink use an embedded node instead of a TransportClient:
下面是一个如何创建ElasticsearchSink使用嵌入式节点而不是的示例TransportClient:

  1. DataStream<String> input = ...;
  2. Map<String, String> config = new HashMap<>;
  3. // This instructs the sink to emit after every element, otherwise they would be buffered
  4. config.put("bulk.flush.max.actions", "1");
  5. config.put("cluster.name", "my-cluster-name");
  6. input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction<String>() {
  7. public IndexRequest createIndexRequest(String element) {
  8. Map<String, String> json = new HashMap<>();
  9. json.put("data", element);
  10. return Requests.indexRequest()
  11. .index("my-index")
  12. .type("my-type")
  13. .source(json);
  14. }
  15. @Override
  16. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  17. indexer.add(createIndexRequest(element));
  18. }
  19. }));
  1. val input: DataStream[String] = ...
  2. val config = new java.util.HashMap[String, String]
  3. config.put("bulk.flush.max.actions", "1")
  4. config.put("cluster.name", "my-cluster-name")
  5. input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] {
  6. def createIndexRequest(element: String): IndexRequest = {
  7. val json = new java.util.HashMap[String, String]
  8. json.put("data", element)
  9. return Requests.indexRequest()
  10. .index("my-index")
  11. .type("my-type")
  12. .source(json)
  13. }
  14. }))

The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes.
不同之处在于,现在我们不需要提供Elasticsearch节点的地址列表。

Handling Failing Elasticsearch Requests 处理失败的Elasticsearch请求

Elasticsearch action requests may fail due to a variety of reasons, including temporarily saturated node queue capacity or malformed documents to be indexed. The Flink Elasticsearch Sink allows the user to specify how request failures are handled, by simply implementing an ActionRequestFailureHandler and providing it to the constructor.
由于各种原因,Elasticsearch操作请求可能会失败,包括临时饱和的节点队列容量或要建立索引的文档格式不正确。Flink Elasticsearch Sink允许用户通过简单地实现ActionRequestFailureHandler并将其提供给构造函数来指定如何处理请求失败。

Below is an example:
下面是一个示例:

  1. DataStream<String> input = ...;
  2. input.addSink(new ElasticsearchSink<>(
  3. config, transportAddresses,
  4. new ElasticsearchSinkFunction<String>() {...},
  5. new ActionRequestFailureHandler() {
  6. @Override
  7. void onFailure(ActionRequest action,
  8. Throwable failure,
  9. int restStatusCode,
  10. RequestIndexer indexer) throw Throwable {
  11. if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
  12. // full queue; re-add document for indexing
  13. indexer.add(action);
  14. } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
  15. // malformed document; simply drop request without failing sink
  16. } else {
  17. // for all other failures, fail the sink
  18. // here the failure is simply rethrown, but users can also choose to throw custom exceptions
  19. throw failure;
  20. }
  21. }
  22. }));
  1. val input: DataStream[String] = ...
  2. input.addSink(new ElasticsearchSink(
  3. config, transportAddresses,
  4. new ElasticsearchSinkFunction[String] {...},
  5. new ActionRequestFailureHandler {
  6. @throws(classOf[Throwable])
  7. override def onFailure(ActionRequest action,
  8. Throwable failure,
  9. int restStatusCode,
  10. RequestIndexer indexer) {
  11. if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
  12. // full queue; re-add document for indexing
  13. indexer.add(action)
  14. } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
  15. // malformed document; simply drop request without failing sink
  16. } else {
  17. // for all other failures, fail the sink
  18. // here the failure is simply rethrown, but users can also choose to throw custom exceptions
  19. throw failure
  20. }
  21. }
  22. }))

The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests with malformed documents, without failing the sink. For all other failures, the sink will fail. If a ActionRequestFailureHandler is not provided to the constructor, the sink will fail for any kind of error.
上面的示例将使接收器重新添加由于队列容量饱和而失败的请求,并丢弃格式错误的文档的请求,而不会使接收器失败。对于所有其他失败,接收器将失败。如果ActionRequestFailureHandler未将a 提供给构造函数,则接收器将因任何类型的错误而失败。

Note that onFailure is called for failures that still occur only after the BulkProcessor internally finishes all backoff retry attempts. By default, the BulkProcessor retries to a maximum of 8 attempts with an exponential backoff. For more information on the behaviour of the internal BulkProcessor and how to configure it, please see the following section.
请注意,onFailure仅在BulkProcessor内部完成所有退避重试尝试之后仍然发生的失败才被称为。默认情况下,BulkProcessor最大重试次数为8,且有指数补偿。有关内部功能BulkProcessor以及如何配置的更多信息,请参见以下部分。

By default, if a failure handler is not provided, the sink uses a NoOpFailureHandler that simply fails for all kinds of exceptions. The connector also provides a RetryRejectedExecutionFailureHandler implementation that always re-add requests that have failed due to queue capacity saturation.
默认情况下,如果未提供故障处理程序,则接收器将对NoOpFailureHandler所有异常使用只会失败的故障处理程序。连接器还提供了一种RetryRejectedExecutionFailureHandler实现,该实现始终重新添加由于队列容量饱和而失败的请求。

IMPORTANT: Re-adding requests back to the internal BulkProcessor on failures will lead to longer checkpoints, as the sink will also need to wait for the re-added requests to be flushed when checkpointing. For example, when using RetryRejectedExecutionFailureHandler, checkpoints will need to wait until Elasticsearch node queues have enough capacity for all the pending requests. This also means that if re-added requests never succeed, the checkpoint will never finish.
重要说明:发生故障时,将请求重新添加到内部BulkProcessor会导致更长的检查点,因为接收器在检查点时还需要等待刷新重新添加的请求。例如,当使用RetryRejectedExecutionFailureHandler时,检查点将需要等待,直到Elasticsearch节点队列具有足够的容量来处理所有挂起的请求。这也意味着,如果重新添加的请求永远不会成功,则检查点将永远不会完成。

Failure handling for Elasticsearch 1.x: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type could not be retrieved through the older version Java client APIs (thus, the types will be general Exceptions and only differ in the failure message). In this case, it is recommended to match on the provided REST status code.
Elasticsearch 1.x的故障处理:对于Elasticsearch 1.x,匹配故障的类型是不可行的,因为无法通过较旧版本的Java客户端API检索确切的类型(因此,这些类型将是通用Exception。并且仅在失败消息中有所不同)。在这种情况下,建议匹配提供的REST状态代码。

Configuring the Internal Bulk Processor 配置内部批量处理器

The internal BulkProcessor can be further configured for its behaviour on how buffered action requests are flushed, by setting the following values in the provided Map&lt;String, String&gt;:
BulkProcessor通过在提供的内容中设置以下值,可以进一步配置内部组件的行为,以了解如何刷新缓冲的动作请求Map

  • bulk.flush.max.actions: Maximum amount of actions to buffer before flushing. 刷新前要缓冲的最大操作数。
  • bulk.flush.max.size.mb: Maximum size of data (in megabytes) to buffer before flushing. 刷新前要缓冲的最大数据大小(以兆字节为单位)。
  • bulk.flush.interval.ms: Interval at which to flush regardless of the amount or size of buffered actions. 刷新间隔,无论缓冲操作的数量或大小如何。

For versions 2.x and above, configuring how temporary request errors are retried is also supported:
对于2.x和更高版本,还支持配置重试临时请求错误的方式:

  • bulk.flush.backoff.enable: Whether or not to perform retries with backoff delay for a flush if one or more of its actions failed due to a temporary EsRejectedExecutionException.
  • 如果刷新的一个或多个操作由于临时原因而失败,是否对刷新执行延迟退避重试EsRejectedExecutionException。
  • bulk.flush.backoff.type: The type of backoff delay, either CONSTANT or EXPONENTIAL
  • 退避延迟的类型,可以是CONSTANT或EXPONENTIAL
  • bulk.flush.backoff.delay: The amount of delay for backoff. For constant backoff, this is simply the delay between each retry. For exponential backoff, this is the initial base delay.
  • 延迟的延迟量。对于恒定的退避,这只是每次重试之间的延迟。对于指数补偿,这是初始基准延迟。
  • bulk.flush.backoff.retries: The amount of backoff retries to attempt.
  • 尝试尝试的退避重试次数。

More information about Elasticsearch can be found here.
可以在此处找到有关Elasticsearch的更多信息。

Packaging the Elasticsearch Connector into an Uber-Jar

For the execution of your Flink program, it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies (see here for further information).
为了执行Flink程序,建议构建一个包含所有依赖项的所谓的uber-jar(可执行jar)(有关更多信息,请参见此处)。

Alternatively, you can put the connector’s jar file into Flink’s lib/ folder to make it available system-wide, i.e. for all job being run.
或者,您可以将连接器的jar文件放入Flink的lib/文件夹中,以使其在整个系统范围内可用,即,对于正在运行的所有作业。