Apache Kafka Connector Apache Kafka连接器

This connector provides access to event streams served by Apache Kafka. 该连接器提供对Apache Kafka服务的事件流的访问。

Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics. The Flink Kafka Consumer integrates with Flink’s checkpointing mechanism to provide exactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka’s consumer group offset tracking, but tracks and checkpoints these offsets internally as well. Flink提供了特殊的Kafka连接器,用于从Kafka主题读写数据。Flink Kafka Consumer与Flink的检查点机制集成在一起,以提供一次精确的处理语义。为了实现这一点,Flink不仅仅依赖于Kafka的消费者群体偏移量跟踪,而是在内部也跟踪并检查这些偏移量。

Please pick a package (maven artifact id) and class name for your use-case and environment. For most users, the FlinkKafkaConsumer08 (part of flink-connector-kafka) is appropriate. 请为您的用例和环境选择一个包(Maven项目ID)和类名。对于大多数用户来说,FlinkKafkaConsumer08(的一部分flink-connector-kafka)是合适的。

| Maven Dependency | Supported since | Consumer and Producer Class name | Kafka version | Notes | | —- | —- | —- | —- | —- | | flink-connector-kafka-0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 | 0.8.x | Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK by Flink. | | flink-connector-kafka-0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 | 0.9.x | Uses the new Consumer API Kafka. | | flink-connector-kafka-0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 | 0.10.x | This connector supports Kafka messages with timestamps both for producing and consuming. | | flink-connector-kafka-0.11_2.11 | 1.4.0 | FlinkKafkaConsumer011 FlinkKafkaProducer011 | 0.11.x | Since 0.11.x Kafka does not support scala 2.10. This connector supports Kafka transactional messaging to provide exactly once semantic for the producer. | | flink-connector-kafka_2.11 | 1.7.0 | FlinkKafkaConsumer FlinkKafkaProducer | >= 1.0.0 | This universal Kafka connector attempts to track the latest version of the Kafka client. The version of the client it uses may change between Flink releases. Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. However for Kafka 0.11.x and 0.10.x versions, we recommend using dedicated flink-connector-kafka-0.11_2.11 and link-connector-kafka-0.10_2.11 respectively.Attention: as of Flink 1.7 the universal Kafka connector is considered to be in a BETA status and might not be as stable as the 0.11 connector. In case of problems with the universal connector, you can try to use flink-connector-kafka-0.11_2.11 which should be compatible with all of the Kafka versions starting from 0.11. |

Then, import the connector in your maven project: 然后,将连接器导入您的Maven项目中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_2.11</artifactId>
  4. <version>1.7.1</version>
  5. </dependency>

Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution here. 请注意,流连接器当前不是二进制分发的一部分。在此处查看如何与它们链接以执行集群。

Installing Apache Kafka 安装Apache Kafka

  • Follow the instructions from Kafka’s quickstart to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
  • 按照Kafka快速入门中的说明下载代码并启动服务器(每次启动该应用程序之前,都需要启动Zookeeper和Kafka服务器)。
  • If the Kafka and Zookeeper servers are running on a remote machine, then the advertised.host.name setting in the config/server.properties file must be set to the machine’s IP address.
  • 如果Kafka和Zookeeper服务器在远程计算机上运行,​​则文件中的advertised.host.name设置config/server.properties必须设置为计算机的IP地址。

Kafka 1.0.0+ Connector Kafka 1.0.0+连接器

Starting with Flink 1.7, there is a new universal Kafka connector that does not track a specific Kafka major version. Rather, it tracks the latest version of Kafka at the time of the Flink release. 从Flink 1.7开始,有一个新的通用Kafka连接器,它不跟踪特定的Kafka主版本。相反,它会在Flink发布时跟踪Kafka的最新版本。

If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version. 如果您的Kafka经纪人版本是1.0.0或更高版本,则应使用此Kafka连接器。如果您使用Kafka的旧版本(0.11、0.10、0.9或0.8),则应使用与代理版本相对应的连接器。

Compatibility 兼容性

The universal Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker. It is compatible with broker versions 0.11.0 or newer, depending on the features used. For details on Kafka compatibility, please refer to the Kafka documentation. 通用的Kafka连接器通过Kafka客户端API和代理的兼容性保证与旧的和新的Kafka代理兼容。它与代理版本0.11.0或更高版本兼容,具体取决于所使用的功能。有关Kafka兼容性的详细信息,请参阅Kafka文档。

Usage 用法

To use the universal Kafka connector add a dependency to it: 要使用通用Kafka连接器,请向其添加一个依赖项:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_2.11</artifactId>
  4. <version>1.7.1</version>
  5. </dependency>

Then instantiate the new source (FlinkKafkaConsumer) and sink (FlinkKafkaProducer). The API is backward compatible with the Kafka 0.11 connector, except of dropping specific Kafka version from the module and class names. 然后实例化新的源(FlinkKafkaConsumer)和接收器(FlinkKafkaProducer)。该API向后兼容Kafka 0.11连接器,但从模块和类名称中删除特定的Kafka版本除外。

Kafka Consumer

Flink’s Kafka consumer is called FlinkKafkaConsumer08 (or 09 for Kafka 0.9.0.x versions, etc. or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions). It provides access to one or more Kafka topics. Flink的Kafka使用者被称为FlinkKafkaConsumer08(对于Kafka 0.9.0.x版本,则称为09,等等,或者FlinkKafkaConsumer对于Kafka> = 1.0.0版本,仅适用于09 )。它提供对一个或多个Kafka主题的访问。

The constructor accepts the following arguments: 构造函数接受以下参数:

  1. The topic name / list of topic names 主题名称/主题名称列表
  2. A DeserializationSchema / KeyedDeserializationSchema for deserializing the data from Kafka DeserializationSchema / KeyedDeserializationSchema用于反序列化来自Kafka的数据
  3. Properties for the Kafka consumer. The following properties are required: 卡夫卡消费者的属性。需要以下属性:
    • “bootstrap.servers” (以逗号分隔的Kafka经纪人列表)
    • “zookeeper.connect” (Zookeeper服务器的逗号分隔列表) (仅对于Kafka 0.8才需要)
    • “group.id” the id of the consumer group 消费者组的ID

Example:

  1. Properties properties = new Properties();
  2. properties.setProperty("bootstrap.servers", "localhost:9092");
  3. // only required for Kafka 0.8
  4. properties.setProperty("zookeeper.connect", "localhost:2181");
  5. properties.setProperty("group.id", "test");
  6. DataStream<String> stream = env
  7. .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", "localhost:9092")
  3. // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181")
  4. properties.setProperty("group.id", "test")
  5. stream = env
  6. .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
  7. .print()

The DeserializationSchema

The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into Java/Scala objects. The DeserializationSchema allows users to specify such a schema. The T deserialize(byte[] message) method gets called for each Kafka message, passing the value from Kafka. Flink Kafka使用者需要了解如何将Kafka中的二进制数据转换为Java / Scala对象。在DeserializationSchema允许用户指定这样的一个架构。T deserialize(byte[] message)每条Kafka消息都会调用该方法,并传递Kafka中的值。

It is usually helpful to start from the AbstractDeserializationSchema, which takes care of describing the produced Java/Scala type to Flink’s type system. Users that implement a vanilla DeserializationSchema need to implement the getProducedType(...) method themselves. 从开始,通常很有帮助AbstractDeserializationSchema,该过程负责将产生的Java / Scala类型描述为Flink的类型系统。实现香草的用户DeserializationSchema需要自己实现该getProducedType(…)方法。

For accessing both the key and value of the Kafka message, the KeyedDeserializationSchema has the following deserialize method T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset). 为了访问Kafka消息的密钥和值,KeyedDeserializationSchema具有以下反序列化方法 T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)。

For convenience, Flink provides the following schemas: 为了方便起见,Flink提供以下架构:

  1. TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) which creates a schema based on a Flink’s TypeInformation. This is useful if the data is both written and read by Flink. This schema is a performant Flink-specific alternative to other generic serialization approaches.
  2. TypeInformationSerializationSchema(和TypeInformationKeyValueSerializationSchema)基于Flink的创建架构TypeInformation。如果数据同时由Flink写入和读取,这将很有用。该模式是其他通用序列化方法的高性能Flink特定替代方案。

  3. JsonDeserializationSchema (and JSONKeyValueDeserializationSchema) which turns the serialized JSON into an ObjectNode object, from which fields can be accessed using objectNode.get(“field”).as(Int/String/…)(). The KeyValue objectNode contains a “key” and “value” field which contain all fields, as well as an optional “metadata” field that exposes the offset/partition/topic for this message. JsonDeserializationSchema(和JSONKeyValueDeserializationSchema)将序列化的JSON转换为ObjectNode对象,可以使用objectNode.get(“ field”)。as(Int / String /…)()从中访问字段。KeyValue objectNode包含一个“键”和“值”字段,其中包含所有字段,以及一个可选的“元数据”字段,用于显示此消息的偏移量/分区/主题。

  4. AvroDeserializationSchema which reads data serialized with Avro format using a statically provided schema. It can infer the schema from Avro generated classes (AvroDeserializationSchema.forSpecific(...)) or it can work with GenericRecords with a manually provided schema (with AvroDeserializationSchema.forGeneric(...)). This deserialization schema expects that the serialized records DO NOT contain embedded schema. AvroDeserializationSchema它使用静态提供的架构读取以Avro格式序列化的数据。它可以从Avro生成的类(AvroDeserializationSchema.forSpecific(…))推断模式,也可以GenericRecords与手动提供的模式(一起使用AvroDeserializationSchema.forGeneric(…))一起使用。此反序列化架构期望序列化的记录不包含嵌入式架构。

    • There is also a version of this schema available that can lookup the writer’s schema (schema which was used to write the record) in Confluent Schema Registry. Using these deserialization schema record will be read with the schema that was retrieved from Schema Registry and transformed to a statically provided( either through ConfluentRegistryAvroDeserializationSchema.forGeneric(...) or ConfluentRegistryAvroDeserializationSchema.forSpecific(...)).
    • 该模式还有一个可用的版本,可以在Confluent Schema Registry中查找作者的模式(用于写入记录的模式)。使用这些反序列化模式记录,将读取从Schema Registry检索并转换为静态提供(通过ConfluentRegistryAvroDeserializationSchema.forGeneric(…)或ConfluentRegistryAvroDeserializationSchema.forSpecific(…))的模式。

    To use this deserialization schema one has to add the following additional dependency: 要使用这种反序列化模式,必须添加以下附加依赖项:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-avro</artifactId>
  4. <version>1.7.1</version>
  5. </dependency>
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-avro-confluent-registry</artifactId>
  4. <version>1.7.1</version>
  5. </dependency>

When encountering a corrupted message that cannot be deserialized for any reason, there are two options - either throwing an exception from the deserialize(...) method which will cause the job to fail and be restarted, or returning null to allow the Flink Kafka consumer to silently skip the corrupted message. Note that due to the consumer’s fault tolerance (see below sections for more details), failing the job on the corrupted message will let the consumer attempt to deserialize the message again. Therefore, if deserialization still fails, the consumer will fall into a non-stop restart and fail loop on that corrupted message. 当遇到由于某种原因而无法反序列化的损坏消息时,有两种选择-从deserialize(…)方法中引发异常,这将导致作业失败并重新启动,或者返回null以允许Flink Kafka使用者静默跳过损坏的消息。请注意,由于使用者的容错能力(请参阅以下各节以获取更多详细信息),如果在损坏的消息上执行失败的工作,则使用者将尝试再次反序列化消息。因此,如果反序列化仍然失败,则使用者将陷入该错误消息的不间断重启和失败循环中。

Kafka Consumers Start Position Configuration 消费者开始位置配置

The Flink Kafka Consumer allows configuring how the start position for Kafka partitions are determined. Flink Kafka Consumer可以配置如何确定Kafka分区的起始位置。

Example:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
  3. myConsumer.setStartFromEarliest(); // start from the earliest record possible
  4. myConsumer.setStartFromLatest(); // start from the latest record
  5. myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
  6. myConsumer.setStartFromGroupOffsets(); // the default behaviour
  7. DataStream<String> stream = env.addSource(myConsumer);
  8. ...
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val myConsumer = new FlinkKafkaConsumer08[String](...)
  3. myConsumer.setStartFromEarliest() // start from the earliest record possible myConsumer.setStartFromLatest() // start from the latest record myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds) myConsumer.setStartFromGroupOffsets() // the default behaviour
  4. val stream = env.addSource(myConsumer)
  5. ...

All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position. Flink Kafka Consumer的所有版本都具有上述用于开始位置的显式配置方法。

  • setStartFromGroupOffsets (default behaviour): Start reading partitions from the consumer group’s (group.id setting in the consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8). If offsets could not be found for a partition, the auto.offset.reset setting in the properties will be used.
  • setStartFromGroupOffsets(默认行为):开始从消费者组(group.id在消费者属性中的设置)中在Kafka代理(或对于Kafka 0.8的Zookeeper)中提交的偏移中读取分区。如果找不到分区的偏移量,auto.offset.reset则将使用属性中的设置。

  • setStartFromEarliest() / setStartFromLatest(): Start from the earliest / latest record. Under these modes, committed offsets in Kafka will be ignored and not used as starting positions.

  • setStartFromEarliest()/ setStartFromLatest():从最早/最新记录开始。在这些模式下,Kafka中已提交的偏移将被忽略,并且不会用作起始位置。

  • setStartFromTimestamp(long): Start from the specified timestamp. For each partition, the record whose timestamp is larger than or equal to the specified timestamp will be used as the start position. If a partition’s latest record is earlier than the timestamp, the partition will simply be read from the latest record. Under this mode, committed offsets in Kafka will be ignored and not used as starting positions.

  • setStartFromTimestamp(long):从指定的时间戳开始。对于每个分区,其时间戳大于或等于指定时间戳的记录将用作开始位置。如果分区的最新记录早于时间戳,则将仅从最新记录中读取分区。在这种模式下,Kafka中已提交的偏移将被忽略,并且不会用作起始位置。

You can also specify the exact offsets the consumer should start from for each partition: 您还可以为每个分区指定使用者应从其开始的确切偏移量:

  1. Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
  2. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
  3. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
  4. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
  5. myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
  1. val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
  2. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
  3. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
  4. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
  5. myConsumer.setStartFromSpecificOffsets(specificStartOffsets)

The above example configures the consumer to start from the specified offsets for partitions 0, 1, and 2 of topic myTopic. The offset values should be the next record that the consumer should read for each partition. Note that if the consumer needs to read a partition which does not have a specified offset within the provided offsets map, it will fallback to the default group offsets behaviour (i.e. setStartFromGroupOffsets()) for that particular partition. 上面的示例将使用者配置为从topic的分区0、1和2的指定偏移量开始myTopic。偏移值应该是使用者应为每个分区读取的下一条记录。请注意,如果使用者需要读取提供的偏移量映射中没有指定偏移量的分区,则它将回退到setStartFromGroupOffsets()该特定分区的默认组偏移量行为(即)。

Note that these start position configuration methods do not affect the start position when the job is automatically restored from a failure or manually restored using a savepoint. On restore, the start position of each Kafka partition is determined by the offsets stored in the savepoint or checkpoint (please see the next section for information about checkpointing to enable fault tolerance for the consumer). 请注意,当作业从故障中自动还原或使用保存点手动还原时,这些起始位置配置方法不会影响起始位置。还原时,每个Kafka分区的起始位置由保存点或检查点中存储的偏移量确定(请参阅下一节,了解有关为用户启用容错功能的检查点信息)。

Kafka Consumers and Fault Tolerance 消费者与容错

With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint. 启用Flink的检查点后,Flink Kafka使用者将使用主题中的记录,并以一致的方式定期检查点其所有Kafka偏移量以及其他操作的状态。万一作业失败,Flink将把流式程序恢复到最新检查点的状态,并从存储在检查点的偏移量开始重新使用Kafka的记录。

The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure. 因此,绘制检查点的间隔定义了在发生故障的情况下最多可以返回多少程序。

To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment: 要使用容错的Kafka使用者,需要在执行环境中启用拓扑检查点:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.enableCheckpointing(5000); // checkpoint every 5000 msecs
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.enableCheckpointing(5000) // checkpoint every 5000 msecs

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. 另请注意,只有在有足够的处理插槽可用于重新启动拓扑时,Flink才能重新启动拓扑。因此,如果拓扑由于失去TaskManager而失败,则此后仍必须有足够的可用插槽。YARN上的Flink支持自动重启丢失的YARN容器。

If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper. 如果未启用检查点,则Kafka使用者将定期将偏移量提交给Zookeeper。

Kafka Consumers Topic and Partition Discovery 消费者主题和分区发现

Partition discovery 分区发现

The Flink Kafka Consumer supports discovering dynamically created Kafka partitions, and consumes them with exactly-once guarantees. All partitions discovered after the initial retrieval of partition metadata (i.e., when the job starts running) will be consumed from the earliest possible offset. Flink Kafka Consumer支持发现动态创建的Kafka分区,并使用一次精确的保证来使用它们。最初检索分区元数据后(即,作业开始运行时)发现的所有分区将从最早的偏移量开始消耗。

By default, partition discovery is disabled. To enable it, set a non-negative value for flink.partition-discovery.interval-millis in the provided properties config, representing the discovery interval in milliseconds. 默认情况下,禁用分区发现。要启用它,请flink.partition-discovery.interval-millis在提供的属性配置中为设置一个非负值,表示发现间隔(以毫秒为单位)。

Limitation When the consumer is restored from a savepoint from Flink versions prior to Flink 1.3.x, partition discovery cannot be enabled on the restore run. If enabled, the restore would fail with an exception. In this case, in order to use partition discovery, please first take a savepoint in Flink 1.3.x and then restore again from that. 限制从使用Flink 1.3.x之前的Flink版本的保存点还原使用者时,无法在还原运行中启用分区发现。如果启用,还原将失败,并出现异常。在这种情况下,为了使用分区发现,请首先在Flink 1.3.x中获取一个保存点,然后从中再次进行保存。

Topic discovery 主题发现

At a higher-level, the Flink Kafka Consumer is also capable of discovering topics, based on pattern matching on the topic names using regular expressions. See the below for an example: 在较高级别,Flink Kafka Consumer还能够使用正则表达式基于主题名称的模式匹配来发现主题。请参阅以下示例:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. Properties properties = new Properties();
  3. properties.setProperty("bootstrap.servers", "localhost:9092");
  4. properties.setProperty("group.id", "test");
  5. FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
  6. java.util.regex.Pattern.compile("test-topic-[0-9]"),
  7. new SimpleStringSchema(),
  8. properties);
  9. DataStream<String> stream = env.addSource(myConsumer);
  10. ...
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val properties = new Properties()
  3. properties.setProperty("bootstrap.servers", "localhost:9092")
  4. properties.setProperty("group.id", "test")
  5. val myConsumer = new FlinkKafkaConsumer08[String](
  6. java.util.regex.Pattern.compile("test-topic-[0-9]"),
  7. new SimpleStringSchema,
  8. properties)
  9. val stream = env.addSource(myConsumer)
  10. ...

In the above example, all topics with names that match the specified regular expression (starting with test-topic- and ending with a single digit) will be subscribed by the consumer when the job starts running. 在上面的示例中,test-topic-当作业开始运行时,使用者将订阅所有名称与指定的正则表达式匹配的主题(以一个数字开头和以一个数字结尾)。

To allow the consumer to discover dynamically created topics after the job started running, set a non-negative value for flink.partition-discovery.interval-millis. This allows the consumer to discover partitions of new topics with names that also match the specified pattern. 要允许使用者在作业开始运行后发现动态创建的主题,请为设置非负值flink.partition-discovery.interval-millis。这使使用者可以发现名称也与指定模式匹配的新主题的分区。

Kafka Consumers Offset Committing Behaviour Configuration 消费者抵消承诺行为配置

The Flink Kafka Consumer allows configuring the behaviour of how offsets are committed back to Kafka brokers (or Zookeeper in 0.8). Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes. Flink Kafka Consumer可以配置如何将偏移量提交回Kafka经纪人(或0.8中的Zookeeper)的行为。请注意,Flink Kafka Consumer不依靠承诺的偏移量来提供容错保证。承诺的偏移量仅是出于监视目的公开用户进度的一种方式。

The way to configure offset commit behaviour is different, depending on whether or not checkpointing is enabled for the job. 根据是否为作业启用检查点,配置偏移提交行为的方式不同。

  • Checkpointing disabled: if checkpointing is disabled, the Flink Kafka Consumer relies on the automatic periodic offset committing capability of the internally used Kafka clients. Therefore, to disable or enable offset committing, simply set the enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms keys to appropriate values in the provided Properties configuration.
  • 禁用检查点:如果禁用检查点,则Flink Kafka Consumer依赖于内部使用的Kafka客户端的自动定期偏移量提交功能。因此,要禁用或启用偏移提交,只需在提供的配置中将enable.auto.commit(或auto.commit.enable对于Kafka 0.8)/ auto.commit.interval.ms键设置为适当的值Properties。

  • Checkpointing enabled: if checkpointing is enabled, the Flink Kafka Consumer will commit the offsets stored in the checkpointed states when the checkpoints are completed. This ensures that the committed offsets in Kafka brokers is consistent with the offsets in the checkpointed states. Users can choose to disable or enable offset committing by calling the setCommitOffsetsOnCheckpoints(boolean) method on the consumer (by default, the behaviour is true). Note that in this scenario, the automatic periodic offset committing settings in Properties is completely ignored.

  • 启用检查点:如果启用检查点,则Flink Kafka Consumer将在检查点完成时提交存储在检查点状态中的偏移量。这样可以确保Kafka代理中的已提交偏移量与检查点状态中的偏移量一致。用户可以通过setCommitOffsetsOnCheckpoints(boolean)在使用者上调用方法来选择禁用或启用偏移提交(默认情况下,行为是true)。请注意,在这种情况下,将Properties完全忽略自动定期偏移提交设置。

Kafka Consumers and Timestamp Extraction/Watermark Emission 使用者和时间戳提取/水印发射

In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself. In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based on special records in the Kafka stream that contain the current event-time watermark. For these cases, the Flink Kafka Consumer allows the specification of an AssignerWithPeriodicWatermarks or an AssignerWithPunctuatedWatermarks. 在许多情况下,记录的时间戳(显式或隐式地)嵌入到记录本身中。另外,用户可能希望定期或以不规则的方式发出水印,例如基于包含当前事件时间水印的Kafka流中的特殊记录。对于这些情况,Flink Kafka Consumer允许指定an AssignerWithPeriodicWatermarks或an AssignerWithPunctuatedWatermarks。

You can specify your custom timestamp extractor/watermark emitter as described here, or use one from the predefined ones. After doing so, you can pass it to your consumer in the following way: 您可以按此处所述指定自定义的时间戳提取器/水印发射器,或使用预定义的提取器/水印发射器。这样做之后,您可以通过以下方式将其传递给消费者:

  1. Properties properties = new Properties();
  2. properties.setProperty("bootstrap.servers", "localhost:9092");
  3. // only required for Kafka 0.8
  4. properties.setProperty("zookeeper.connect", "localhost:2181");
  5. properties.setProperty("group.id", "test");
  6. FlinkKafkaConsumer08<String> myConsumer =
  7. new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
  8. myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
  9. DataStream<String> stream = env
  10. .addSource(myConsumer)
  11. .print();
  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", "localhost:9092")
  3. // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181")
  4. properties.setProperty("group.id", "test")
  5. val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
  6. myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
  7. stream = env
  8. .addSource(myConsumer)
  9. .print()

Internally, an instance of the assigner is executed per Kafka partition. When such an assigner is specified, for each record read from Kafka, the extractTimestamp(T element, long previousElementTimestamp) is called to assign a timestamp to the record and the Watermark getCurrentWatermark() (for periodic) or the Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) (for punctuated) is called to determine if a new watermark should be emitted and with which timestamp. 在内部,每个Kafka分区都会执行分配器的实例。指定了这样的分配器后,对于从Kafka读取的每个记录,将extractTimestamp(T element, long previousElementTimestamp)调用,以为该记录分配时间戳,并调用Watermark getCurrentWatermark()(用于(定期)或Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)(用于标点))确定是否应发出新的水印,以及使用哪个水印时间戳记。

Note: If a watermark assigner depends on records read from Kafka to advance its watermarks (which is commonly the case), all topics and partitions need to have a continuous stream of records. Otherwise, the watermarks of the whole application cannot advance and all time-based operations, such as time windows or functions with timers, cannot make progress. A single idle Kafka partition causes this behavior. A Flink improvement is planned to prevent this from happening (see FLINK-5479: Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions). In the meanwhile, a possible workaround is to send heartbeat messages to all consumed partitions that advance the watermarks of idle partitions. Note: 如果水印分配器依靠从Kafka读取的记录来推进其水印(通常是这种情况),则所有主题和分区都需要具有连续的记录流。否则,整个应用程序的水印将无法前进,并且所有基于时间的操作(例如时间窗口或带有计时器的功能)都无法取得进展。单个空闲的Kafka分区会导致此行为。计划对Flink进行改进以防止这种情况的发生(请参阅FLINK-5479:FlinkKafkaConsumer中的按分区水印应考虑空闲分区)。同时,一种可能的解决方法是将心跳消息发送到所有消耗的分区,从而提高空闲分区的水印。

Kafka Producer

Flink’s Kafka Producer is called FlinkKafkaProducer011 (or 010 for Kafka 0.10.0.x versions, etc. or just FlinkKafkaProducer for Kafka >= 1.0.0 versions). It allows writing a stream of records to one or more Kafka topics. Flink的Kafka Producer被调用FlinkKafkaProducer011(或010用于Kafka 0.10.0.x版本等,或仅FlinkKafkaProducer用于Kafka> = 1.0.0版本)。它允许将记录流写入一个或多个Kafka主题。

Example:

  1. DataStream<String> stream = ...;
  2. FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
  3. "localhost:9092", // broker list
  4. "my-topic", // target topic
  5. new SimpleStringSchema()); // serialization schema
  6. // versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
  7. // this method is not available for earlier Kafka versions
  8. myProducer.setWriteTimestampToKafka(true);
  9. stream.addSink(myProducer);
  1. val stream: DataStream[String] = ...
  2. val myProducer = new FlinkKafkaProducer011[String](
  3. "localhost:9092", // broker list
  4. "my-topic", // target topic
  5. new SimpleStringSchema) // serialization schema
  6. // versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
  7. // this method is not available for earlier Kafka versions myProducer.setWriteTimestampToKafka(true)
  8. stream.addSink(myProducer)

The above examples demonstrate the basic usage of creating a Flink Kafka Producer to write streams to a single Kafka target topic. For more advanced usages, there are other constructor variants that allow providing the following: 上面的示例演示了创建Flink Kafka Producer将流写入单个Kafka目标主题的基本用法。对于更高级的用法,还有其他构造函数变体可以提供以下功能:

  • Providing custom properties: The producer allows providing a custom properties configuration for the internal KafkaProducer. Please refer to the Apache Kafka documentation for details on how to configure Kafka Producers.
  • 提供自定义属性:生产者允许提供内部的自定义属性配置KafkaProducer。请参阅Apache Kafka文档以获取有关如何配置Kafka Producer的详细信息。
  • Custom partitioner: To assign records to specific partitions, you can provide an implementation of a FlinkKafkaPartitioner to the constructor. This partitioner will be called for each record in the stream to determine which exact partition of the target topic the record should be sent to. Please see Kafka Producer Partitioning Scheme for more details.
  • 自定义分区程序:要将记录分配给特定分区,可以为FlinkKafkaPartitioner构造函数提供的实现。将为流中的每条记录调用此分区程序,以确定应将记录发送到目标主题的确切分区。有关更多详细信息,请参阅Kafka生产者分区方案。
  • Advanced serialization schema: Similar to the consumer, the producer also allows using an advanced serialization schema called KeyedSerializationSchema, which allows serializing the key and value separately. It also allows to override the target topic, so that one producer instance can send data to multiple topics.
  • 高级序列化架构:类似于使用者,生产者还允许使用称为的高级序列化架构KeyedSerializationSchema,该架构允许分别对键和值进行序列化。它还允许覆盖目标主题,以便一个生产者实例可以将数据发送到多个主题。

Kafka Producer Partitioning Scheme 生产者分区计划

By default, if a custom partitioner is not specified for the Flink Kafka Producer, the producer will use a FlinkFixedPartitioner that maps each Flink Kafka Producer parallel subtask to a single Kafka partition (i.e., all records received by a sink subtask will end up in the same Kafka partition). 默认情况下,如果未为Flink Kafka Producer指定自定义分区程序,则生产者将使用A FlinkFixedPartitioner将每个Flink Kafka Producer并行子任务映射到单个Kafka分区(即,接收器子任务接收到的所有记录都将以相同的结尾Kafka分区)。

A custom partitioner can be implemented by extending the FlinkKafkaPartitioner class. All Kafka versions’ constructors allow providing a custom partitioner when instantiating the producer. Note that the partitioner implementation must be serializable, as they will be transferred across Flink nodes. Also, keep in mind that any state in the partitioner will be lost on job failures since the partitioner is not part of the producer’s checkpointed state. 可以通过扩展FlinkKafkaPartitioner类来实现自定义分区程序。所有Kafka版本的构造函数都允许在实例化生产者时提供自定义分区程序。请注意,分区器实现必须是可序列化的,因为它们将在Flink节点之间传输。另外,请记住,由于作业失败,分区程序中的任何状态都将丢失,因为分区程序不是生产者检查点状态的一部分。

It is also possible to completely avoid using and kind of partitioner, and simply let Kafka partition the written records by their attached key (as determined for each record using the provided serialization schema). To do this, provide a null custom partitioner when instantiating the producer. It is important to provide null as the custom partitioner; as explained above, if a custom partitioner is not specified the FlinkFixedPartitioner is used instead. 也可以完全避免使用某种分区程序,只需让Kafka通过其附加键(使用提供的序列化模式为每条记录确定)对已写记录进行分区即可。为此,请null在实例化生产者时提供自定义分区程序。提供null作为自定义分区很重要;如上所述,如果未指定自定义分区程序,FlinkFixedPartitioner则使用。

Kafka Producers and Fault Tolerance 生产者和容错

Kafka 0.8

Before 0.9 Kafka did not provide any mechanisms to guarantee at-least-once or exactly-once semantics. 在0.9之前,Kafka没有提供任何机制来保证至少一次或精确一次语义。

Kafka 0.9 and 0.10

With Flink’s checkpointing enabled, the FlinkKafkaProducer09 and FlinkKafkaProducer010 can provide at-least-once delivery guarantees. 启用Flink的检查点后,FlinkKafkaProducer09和FlinkKafkaProducer010可以提供至少一次的交付保证。

Besides enabling Flink’s checkpointing, you should also configure the setter methods setLogFailuresOnly(boolean) and setFlushOnCheckpoint(boolean) appropriately. 除了使弗林克的检查点,你还应该配置setter方法setLogFailuresOnly(boolean)和setFlushOnCheckpoint(boolean)适当的。

  • setLogFailuresOnly(boolean): by default, this is set to false. Enabling this will let the producer only log failures instead of catching and rethrowing them. This essentially accounts the record to have succeeded, even if it was never written to the target Kafka topic. This must be disabled for at-least-once.
  • setLogFailuresOnly(boolean):默认情况下设置为false。启用此功能将使生产者仅记录故障,而不是捕获并重新抛出故障。从本质上讲,即使从未将记录写入目标Kafka主题,该记录也取得了成功。必须至少一次禁用此功能。
  • setFlushOnCheckpoint(boolean): by default, this is set to true. With this enabled, Flink’s checkpoints will wait for any on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before succeeding the checkpoint. This ensures that all records before the checkpoint have been written to Kafka. This must be enabled for at-least-once.
  • setFlushOnCheckpoint(boolean):默认情况下设置为true。启用此功能后,Flink的检查点将在Kafka确认检查点时等待所有即时记录,然后再执行检查点。这样可以确保将检查点之前的所有记录都写入Kafka。必须至少启用一次。

In conclusion, the Kafka producer by default has at-least-once guarantees for versions 0.9 and 0.10, with setLogFailureOnly set to false and setFlushOnCheckpoint set to true. 总之,卡夫卡生产者在默认情况下具有-至少一次担保0.9版本和0.10,与setLogFailureOnly设置为false和setFlushOnCheckpoint设置为true。

Note: By default, the number of retries is set to “0”. This means that when setLogFailuresOnly is set to false, the producer fails immediately on errors, including leader changes. The value is set to “0” by default to avoid duplicate messages in the target topic that are caused by retries. For most production environments with frequent broker changes, we recommend setting the number of retries to a higher value. 注意:默认情况下,重试次数设置为“ 0”。这意味着当setLogFailuresOnly设置为时false,生产者将立即因包括领导者变更在内的错误而失败。默认情况下,该值设置为“ 0”,以避免在目标主题中由重试引起的重复消息。对于大多数频繁更改代理的生产环境,我们建议将重试次数设置为较高的值。 Note: There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery into a Kafka topic. 注意:目前尚无Kafka的事务处理生产者,因此Flink无法保证一次准确地将邮件发送到Kafka主题中。

Kafka 0.11 and newer 及更高版本

With Flink’s checkpointing enabled, the FlinkKafkaProducer011 (FlinkKafkaProducer for Kafka >= 1.0.0 versions) can provide exactly-once delivery guarantees. 启用Flink的检查点后,FlinkKafkaProducer011(FlinkKafkaProducer对于Kafka> = 1.0.0版本)可以提供一次准确的交付保证。

Besides enabling Flink’s checkpointing, you can also choose three different modes of operating chosen by passing appropriate semantic parameter to the FlinkKafkaProducer011 (FlinkKafkaProducer for Kafka >= 1.0.0 versions): 除了启用Flink的检查点之外,还可以通过将适当的semantic参数传递给FlinkKafkaProducer011(FlinkKafkaProducer对于Kafka> = 1.0.0版本)来选择三种不同的操作模式:

  • Semantic.NONE: Flink will not guarantee anything. Produced records can be lost or they can be duplicated.
  • Semantic.NONE:Flink不能保证任何事情。产生的记录可能会丢失或可以重复。
  • Semantic.AT_LEAST_ONCE (default setting): similar to setFlushOnCheckpoint(true) in FlinkKafkaProducer010. This guarantees that no records will be lost (although they can be duplicated).
  • Semantic.AT_LEAST_ONCE(默认设置):类似于setFlushOnCheckpoint(true)中的FlinkKafkaProducer010。这样可以保证不会丢失任何记录(尽管可以重复记录)。
  • Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once semantic. Whenever you write to Kafka using transactions, do not forget about setting desired isolation.level (read_committed or read_uncommitted - the latter one is the default value) for any application consuming records from Kafka.
  • Semantic.EXACTLY_ONCE:使用Kafka事务提供一次精确的语义。每当您使用事务写入Kafka时,请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed或read_uncommitted-后者是默认值)。
Caveats 注意事项

Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the time between Flink application crash and completed restart is larger then Kafka’s transaction timeout there will be data loss (Kafka will automatically abort transactions that exceeded timeout time). Having this in mind, please configure your transaction timeout appropriately to your expected down times. Semantic.EXACTLY_ONCE模式依赖于提交从所述检查点恢复之后在采取检查点之前启动的事务的能力。如果Flink应用程序崩溃到完全重新启动之间的时间更长,则Kafka的事务超时将丢失数据(Kafka将自动中止超过超时时间的事务)。考虑到这一点,请根据您的预期停机时间适当配置事务超时。

Kafka brokers by default have transaction.max.timeout.ms set to 15 minutes. This property will not allow to set transaction timeouts for the producers larger then it’s value. FlinkKafkaProducer011 by default sets the transaction.timeout.ms property in producer config to 1 hour, thus transaction.max.timeout.ms should be increased before using the Semantic.EXACTLY_ONCE mode. 默认情况下,Kafka经纪人transaction.max.timeout.ms设置为15分钟。此属性不允许为生产者设置大于其值的事务超时。FlinkKafkaProducer011默认情况下,将transaction.timeout.ms生产者配置中的属性设置为1小时,因此transaction.max.timeout.ms在使用该Semantic.EXACTLY_ONCE模式之前应增加该属性。

In read_committed mode of KafkaConsumer, any transactions that were not finished (neither aborted nor completed) will block all reads from the given Kafka topic past any un-finished transaction. In other words after following sequence of events: 在的read_committed模式下KafkaConsumer,任何未完成(未中止或未完成)的事务将阻止所有未完成的事务从给定的Kafka主题进行的所有读取。换句话说,在以下事件序列之后:

  1. User started transaction1 and written some records using it
  2. 用户transaction1使用它启动并写了一些记录
  3. User started transaction2 and written some further records using it
  4. 用户开始transaction2使用它并写了一些进一步的记录
  5. User committed transaction2
  6. 用户承诺 transaction2

Even if records from transaction2 are already committed, they will not be visible to the consumers until transaction1 is committed or aborted. This has two implications: 即使来自transaction2的记录已经提交,在transaction1提交或中止之前,消费者也看不到它们。这有两个含义:

  • First of all, during normal working of Flink applications, user can expect a delay in visibility of the records produced into Kafka topics, equal to average time between completed checkpoints.
  • 首先,在Flink应用程序的正常工作过程中,用户可以预期在Kafka主题中生成的记录的可见性延迟,该延迟等于完成检查点之间的平均时间。
  • Secondly in case of Flink application failure, topics into which this application was writing, will be blocked for the readers until the application restarts or the configured transaction timeout time will pass. This remark only applies for the cases when there are multiple agents/applications writing to the same Kafka topic.
  • 其次,在Flink应用程序失败的情况下,将阻止读者阅读该应用程序正在写入的主题,直到应用程序重新启动或经过配置的事务超时时间为止。此注释仅适用于有多个代理/应用程序写入同一个Kafka主题的情况。

Note: Semantic.EXACTLY_ONCE mode uses a fixed size pool of KafkaProducers per each FlinkKafkaProducer011 instance. One of each of those producers is used per one checkpoint. If the number of concurrent checkpoints exceeds the pool size, FlinkKafkaProducer011 will throw an exception and will fail the whole application. Please configure max pool size and max number of concurrent checkpoints accordingly. 注意:Semantic.EXACTLY_ONCE模式每个FlinkKafkaProducer011实例使用一个固定大小的KafkaProducers池。每个检查点都使用其中一个生产者。如果并发检查点的数量超过池大小,FlinkKafkaProducer011将引发异常,并使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数。

Note: Semantic.EXACTLY_ONCE takes all possible measures to not leave any lingering transactions that would block the consumers from reading from Kafka topic more then it is necessary. However in the event of failure of Flink application before first checkpoint, after restarting such application there is no information in the system about previous pool sizes. Thus it is unsafe to scale down Flink application before first checkpoint completes, by factor larger then FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR. 注意:Semantic.EXACTLY_ONCE采取所有可能的措施,不要留下任何挥之不去的交易,否则这将有必要阻止消费者继续阅读Kafka主题。但是,如果Flink应用程序在第一个检查点之前失败,则在重新启动此类应用程序后,系统中将没有有关先前池大小的信息。因此,在第一个检查点完成之前按比例缩小Flink应用程序是不安全的FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR。

Using Kafka timestamps and Flink event time in Kafka 0.10 使用Kafka时间戳和Flink事件时间

Since Apache Kafka 0.10+, Kafka’s messages can carry timestamps, indicating the time the event has occurred (see “event time” in Apache Flink) or the time when the message has been written to the Kafka broker. 从Apache Kafka 0.10+开始,Kafka的消息可以带有时间戳,指示事件发生的时间(请参阅Apache Flink中的“事件时间”)或消息已被写入Kafka代理的时间。

The FlinkKafkaConsumer010 will emit records with the timestamp attached, if the time characteristic in Flink is set to TimeCharacteristic.EventTime (StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)). 在FlinkKafkaConsumer010将发射记录附有时间戳,如果在弗林克时间特性被设定为TimeCharacteristic.EventTime(StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime))。

The Kafka consumer does not emit watermarks. To emit watermarks, the same mechanisms as described above in “Kafka Consumers and Timestamp Extraction/Watermark Emission” using the assignTimestampsAndWatermarks method are applicable. Kafka使用者不会发出水印。为了发出水印,适用与上述使用该assignTimestampsAndWatermarks方法的“卡夫卡使用者和时间戳提取/水印发射”中所述的相同机制。

There is no need to define a timestamp extractor when using the timestamps from Kafka. The previousElementTimestamp argument of the extractTimestamp() method contains the timestamp carried by the Kafka message. 使用Kafka的时间戳时,无需定义时间戳提取器。该方法的previousElementTimestamp参数extractTimestamp()包含由Kafka消息携带的时间戳。

A timestamp extractor for a Kafka consumer would look like this: 卡夫卡消费者的时间戳提取器如下所示:

  1. public long extractTimestamp(Long element, long previousElementTimestamp) {
  2. return previousElementTimestamp;
  3. }

The FlinkKafkaProducer010 only emits the record timestamp, if setWriteTimestampToKafka(true) is set. 该FlinkKafkaProducer010只发出了创纪录的时间戳,如果setWriteTimestampToKafka(true)设置。

  1. FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
  2. config.setWriteTimestampToKafka(true);

Kafka Connector metrics 指标

Flink’s Kafka connectors provide some metrics through Flink’s metrics system to analyze the behavior of the connector. The producers export Kafka’s internal metrics through Flink’s metric system for all supported versions. The consumers export all metrics starting from Kafka version 0.9. The Kafka documentation lists all exported metrics in its documentation. Flink的Kafka连接器通过Flink的指标系统提供了一些指标,以分析连接器的行为。生产者通过Flink的度量标准系统为所有受支持的版本导出Kafka的内部度量标准。使用者导出从Kafka 0.9版开始的所有指标。Kafka文档在其文档中列出了所有导出的指标。

In addition to these metrics, all consumers expose the current-offsets and committed-offsets for each topic partition. The current-offsets refers to the current offset in the partition. This refers to the offset of the last element that we retrieved and emitted successfully. The committed-offsets is the last committed offset. 除了这些指标之外,所有使用者都针对每个主题分区公开current-offsets和committed-offsets。的current-offsets是指当前分区中的偏移量。这是指我们成功检索并发出的最后一个元素的偏移量。该committed-offsets是最后提交的偏移。

The Kafka Consumers in Flink commit the offsets back to Zookeeper (Kafka 0.8) or the Kafka brokers (Kafka 0.9+). If checkpointing is disabled, offsets are committed periodically. With checkpointing, the commit happens once all operators in the streaming topology have confirmed that they’ve created a checkpoint of their state. This provides users with at-least-once semantics for the offsets committed to Zookeeper or the broker. For offsets checkpointed to Flink, the system provides exactly once guarantees. Flink中的Kafka消费者将补偿额返还给Zookeeper(Kafka 0.8)或Kafka经纪人(Kafka 0.9+)。如果禁用检查点,则会定期提交偏移量。使用检查点时,一旦流拓扑中的所有操作员都确认已创建其状态的检查点,就会进行提交。这为用户提供了至少一次提交给Zookeeper或代理的偏移量的语义。对于指向Flink的偏移量,系统仅提供一次保证。

The offsets committed to ZK or the broker can also be used to track the read progress of the Kafka consumer. The difference between the committed offset and the most recent offset in each partition is called the consumer lag. If the Flink topology is consuming the data slower from the topic than new data is added, the lag will increase and the consumer will fall behind. For large production deployments we recommend monitoring that metric to avoid increasing latency. 提交给ZK或代理的偏移量也可以用于跟踪Kafka使用者的读取进度。每个分区中提交的偏移量和最新偏移量之间的差称为使用者延迟。如果Flink拓扑使用主题的数据的速度比添加新数据的速度慢,则延迟将增加,并且消费方将落后。对于大型生产部署,我们建议监视该指标以避免增加延迟。

Enabling Kerberos Authentication (for versions 0.9+ and above only) 启用Kerberos身份验证(仅适用于0.9+及更高版本)

Flink provides first-class support through the Kafka connector to authenticate to a Kafka installation configured for Kerberos. Simply configure Flink in flink-conf.yaml to enable Kerberos authentication for Kafka like so: Flink通过Kafka连接器提供一流的支持,以验证为Kerberos配置的Kafka安装。只需配置Flink flink-conf.yaml即可为Kafka启用Kerberos身份验证,如下所示:

  1. Configure Kerberos credentials by setting the following -
  2. 通过设置以下内容配置Kerberos凭据
    • security.kerberos.login.use-ticket-cache: By default, this is true and Flink will attempt to use Kerberos credentials in ticket caches managed by kinit. Note that when using the Kafka connector in Flink jobs deployed on YARN, Kerberos authorization using ticket caches will not work. This is also the case when deploying using Mesos, as authorization using ticket cache is not supported for Mesos deployments.
    • security.kerberos.login.use-ticket-cache:默认为,true并且Flink将尝试在所管理的票证缓存中使用Kerberos凭据kinit。请注意,在YARN上部署的Flink作业中使用Kafka连接器时,使用票证缓存的Kerberos授权将不起作用。使用Mesos进行部署时也是如此,因为Mesos部署不支持使用票证缓存进行授权。
    • security.kerberos.login.keytab and security.kerberos.login.principal: To use Kerberos keytabs instead, set values for both of these properties.
    • security.kerberos.login.keytab和security.kerberos.login.principal:要改为使用Kerberos键表,请为这两个属性设置值。
  3. Append KafkaClient to security.kerberos.login.contexts: This tells Flink to provide the configured Kerberos credentials to the Kafka login context to be used for Kafka authentication.
  4. 加KafkaClient到security.kerberos.login.contexts:这告诉Flink向Kafka登录上下文提供配置的Kerberos凭据,以用于Kafka身份验证。

Once Kerberos-based Flink security is enabled, you can authenticate to Kafka with either the Flink Kafka Consumer or Producer by simply including the following two settings in the provided properties configuration that is passed to the internal Kafka client: 一旦启用了基于Kerberos的Flink安全性,您就可以通过Flink Kafka使用者或Producer向Kafka进行身份验证,只需在传递给内部Kafka客户端的提供的属性配置中简单地包含以下两个设置即可:

  • Set security.protocol to SASL_PLAINTEXT (default NONE): The protocol used to communicate to Kafka brokers. When using standalone Flink deployment, you can also use SASL_SSL; please see how to configure the Kafka client for SSL here.
  • 设置security.protocol为SASL_PLAINTEXT(默认NONE):用于与Kafka代理进行通信的协议。使用独立的Flink部署时,也可以使用SASL_SSL; 请在此处查看如何为SSL配置Kafka客户端。
  • Set sasl.kerberos.service.name to kafka (default kafka): The value for this should match the sasl.kerberos.service.name used for Kafka broker configurations. A mismatch in service name between client and server configuration will cause the authentication to fail.
  • 设置sasl.kerberos.service.name为kafka(默认kafka):此值应与sasl.kerberos.service.name用于Kafka代理配置的值匹配。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。

For more information on Flink configuration for Kerberos security, please see here. You can also find here further details on how Flink internally setups Kerberos-based security. 有关用于Kerberos安全性的Flink配置的更多信息,请参见此处。您还可以在此处找到有关Flink如何在内部设置基于Kerberos的安全性的更多详细信息。

Troubleshooting 故障排除

If you have a problem with Kafka when using Flink, keep in mind that Flink only wraps KafkaConsumer or KafkaProducer and your problem might be independent of Flink and sometimes can be solved by upgrading Kafka brokers, reconfiguring Kafka brokers or reconfiguring KafkaConsumer or KafkaProducer in Flink. Some examples of common problems are listed below. 如果使用Flink时Kafka有问题,请记住Flink仅包装KafkaConsumer或KafkaProducer,而您的问题可能与Flink无关,并且有时可以通过升级Kafka代理,重新配置Kafka代理或重新配置KafkaConsumer或KafkaProducer在Flink中解决。下面列出了一些常见问题的示例。

Data loss 资料遗失

Depending on your Kafka configuration, even after Kafka acknowledges writes you can still experience data loss. In particular keep in mind about the following properties in Kafka config: 根据您的Kafka配置,即使在Kafka确认写入后,您仍然会遇到数据丢失的情况。特别要注意Kafka配置中的以下属性:

  • acks
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.flush.*

Default values for the above options can easily lead to data loss. Please refer to the Kafka documentation for more explanation. 上述选项的默认值很容易导致数据丢失。请参阅Kafka文档以获取更多说明。

UnknownTopicOrPartitionException

One possible cause of this error is when a new leader election is taking place, for example after or during restarting a Kafka broker. This is a retriable exception, so Flink job should be able to restart and resume normal operation. It also can be circumvented by changing retries property in the producer settings. However this might cause reordering of messages, which in turn if undesired can be circumvented by setting max.in.flight.requests.per.connection to 1. 导致此错误的一个可能原因是在进行新的领导者选举时,例如在重启Kafka经纪人之后或期间。这是一个可重试的异常,因此Flink作业应该能够重新启动并恢复正常操作。也可以通过retries在生产者设置中更改属性来规避。但是,这可能会导致消息重新排序,如果不希望的话,可以通过将其设置max.in.flight.requests.per.connection为1 来避免。