Kafka-Connector

针对Flink的流处理,最常用的组件就是Kafka,原始日志数据产生后会被日志采集工具采集到Kafka中让Flink去处理,处理之后的数据可能也会继续写入到Kafka中,Kafka可以作为Flink的DataSource和DataSink来使用 并且Kafka中的Partition机制和Flink的并行度机制可以深度结合,提高数据的读取效率和写入效率。

想要在Flink中使用Kafka需要添加对应的依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_2.12</artifactId>
  4. <version>1.11.1</version>
  5. </dependency>

Kafka作为DataSource

案例

演示一下在Flink中如何消费Kafka中的数据,此时需要用到Kafka Consumer

  1. /**
  2. * Flink从Kafka中消费数据
  3. */
  4. object StreamKafkaSource {
  5. def main(args: Array[String]): Unit = {
  6. val env = StreamExecutionEnvironment.getExecutionEnvironment
  7. //指定FlinkKafkaConsumer相关配置
  8. val topic = "t1"
  9. val prop = new Properties()
  10. prop.setProperty("bootstrap.servers", "bigdata1:9092")
  11. prop.setProperty("group.id", "con1")
  12. val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
  13. //指定kafka作为source
  14. val text = env.addSource(kafkaConsumer)
  15. //将读取到的数据打印到控制台上
  16. text.print()
  17. env.execute()
  18. }
  19. }

在运行代码之前,需要先启动zookeeper集群和kafka集群
在kafka中创建topic:t1

cd /data/soft/kafka_2.12-2.4.1/

bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 1 --topic t1

然后启动代码
再启动一个Kafka 的 console生产者模拟产生数据,验证效果。

[root@bigdata1 kafka_2.12-2.4.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1
>hello flink
>hello kafka
>

查看控制台
image.png

Kafka Consumer消费策略设置

    val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)

    //kafka consumer的消费策略设置
    //默认策略,读取group.id对应保存的offset开始消费数据,读取不到则根据kafka中auto.offset.reset参数的值开始消费数据
    kafkaConsumer.setStartFromGroupOffsets()

    //从最早的记录开始消费数据,忽略已提交的offset信息
    //kafkaConsumer.setStartFromEarliest()

    //从最新的记录开始消费数据,忽略已提交的offset信息
    //kafkaConsumer.setStartFromLatest()

    //从指定的时间戳开始消费数据,对于每个分区,其时间戳大于或等于指定时间戳的记录将被作为起始位置
    //kafkaConsumer.setStartFromTimestamp(1769498624)

Kafka Consumer的容错

Flink中也有checkpoint机制,Checkpoint是Flink实现容错机制的核心功能,它能够根据配置周期性地基于流中各个算子任务的State来生成快照,从而将这些State数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。
当CheckPoint机制开启的时候,Consumer会定期把Kafka的offset信息还有其它算子任务的State信息一块保存起来
当Job失败重启的时候,Flink会从最近一次的CheckPoint中进行恢复数据,重新消费Kafka中的数据
为了能够使用支持容错的Consumer,需要开启checkpoint

如何开启Checkpoint

那如何开启checkpoint呢?

    //每隔5000 ms执行一次checkpoint(设置checkpoint的周期)
    env.enableCheckpointing(5000)

针对checkpoint的相关配置

    //设置模式为.EXACTLY_ONCE (这是默认值) ,还可以设置为AT_LEAST_ONCE
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    //确保两次Checkpoint之间有至少多少 ms的间隔(checkpoint最小间隔)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    //Checkpoint必须在一分钟内完成,或者被丢弃(checkpoint的超时时间)
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    //同一时间只允许执行一个Checkpoint
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    //表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

最后还有一个配置,设置State数据存储的位置
默认情况下,State数据会保存在TaskManager的内存中,Checkpoint执行时,会将State数据存储在JobManager的内存中。
具体的存储位置取决于State Backend的配置,Flink 一共提供了3种存储方式

  1. MemoryStateBackend State数据保存在Java堆内存中,执行Checkpoint的时候,会把State的快照数据保存到JobManager的内存中,基于内存的State Backend在生产环境下不建议使用。
  2. FsStateBackend State数据保存在TaskManager的内存中,执行Checkpoint的时候,会把State的快照数据保存到配置的文件系统中,可以使用HDFS等分布式文件系统。
  3. RocksDBStateBackend RocksDB跟上面的都略有不同,它会在本地文件系统中维护State,State会直接写入本地RocksDB中。同时它需要配置一个远端的文件系统(一般是HDFS),在做Checkpoint的时候,会把本地的数据直接复制到远端的文件系统中。故障切换的时候直接从远端的文件系统中恢复数据到本地。RocksDB克服了State受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产环境中使用。

所以在这里我们使用第三种:RocksDBStateBackend 针对RocksDBStateBackend需要引入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
    <version>1.11.1</version>
</dependency>
    //设置状态数据存储的位置
    env.setStateBackend(new RocksDBStateBackend("hdfs://bigdata1:9000/flink/checkpoints",true))

Kafka Consumers Offset 自动提交

Kafka Consumers Offset自动提交机制需要根据Job是否开启Checkpoint来区分。
CheckPoint关闭时:通过参数enable.auto.commitauto.commit.interval.ms控制
CheckPoint开启时:执行CheckPoint的时候才会提交offset,此时kafka中的自动提交机制就会被忽略

Kafka作为DataSink

案例

下面我们来看一下在Flink中如何向Kafka中写数据,此时需要用到Kafka Producer

/**
 * Flink向Kafka中生产数据
 * partitioner设置为FlinkFixedPartitioner,所有的数据都写入一个分区
 */
object StreamKafkaSink {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //socket作为source
    val text = env.socketTextStream("bigdata1", 9002)

    //指定FlinkKafkaProducer的相关配置
    val topic = "t2"
    val prop = new Properties()
    prop.setProperty("bootstrap.servers", "bigdata1:9092")

    /**
     * 指定kafka作为sink
     * KafkaSerializationSchemaWrapper的几个参数
     * 1:topic:指定需要写入的topic名称即可
     * 2:partitioner,通过自定义分区器实现将数据写入到指定topic的具体分区中
     * 默认会使用FlinkFixedPartitioner,它表示会将所有的数据都写入指定topic的一个分区里面
     * 如果不想自定义分区器,也不想使用默认的,可以直接使用null即可
     * 3:writeTimeStamp,向topic中写入数据的时候,是否写入时间戳
     * 如果写入了,那么在watermark的案例中,使用extractTimestamp()提起时间戳的时候,就可以直接使用recordTimestamp即可,它表示的就是我们在这里写入的数据对应的timestamp
     * 4、SimpleStringSchema,序列化
     */
    val kafkaProducer = new FlinkKafkaProducer[String](topic,
      new KafkaSerializationSchemaWrapper[String](topic, new FlinkFixedPartitioner[String](), false, new SimpleStringSchema()),
      prop,
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
    text.addSink(kafkaProducer)

    env.execute("StreamKafkaSinkScala")
  }

}

在执行代码之前,需要创建topic:t2

cd /data/soft/kafka_2.12-2.4.1/

bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 1 --topic t2

启动kafka-console-consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t2

开启socket,产生数据

[root@bigdata1 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t2
hello
hello flink
hello kafka
hello scala
zzl

查看kafka-console-consumer
image.png

数据传输流程:socket→flink→kafka

CMAK中查看t2中数据的分布,发现所有数据都在一个分区中
image.png

设置写入写入的分区

如果我们不需要自定义分区器的时候,直接传递为null即可。不要使用FlinkFixedPartitioner,它会将数据都写入到topic的一个分区中。

FlinkFixedPartitioner设置为null,重新验证一次,创建一个新的topic,t3

bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 1 --topic t3
    val topic = "t2"

    val kafkaProducer = new FlinkKafkaProducer[String](topic,
      new KafkaSerializationSchemaWrapper[String](topic, new FlinkFixedPartitioner[String](), false, new SimpleStringSchema()),
      prop,
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)

改成

    val topic = "t3"

    val kafkaProducer = new FlinkKafkaProducer[String](topic,
      new KafkaSerializationSchemaWrapper[String](topic, null, false, new SimpleStringSchema()),
      prop,
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)

开启socket,产生数据

[root@bigdata1 ~]# nc -l 9002
hhh
111
222
333
444
555
666
777
888

image.png
查看结果
image.png

Kafka Producer的容错

如果Flink开启了CheckPoint,针对FlinkKafkaProducer可以提供EXACTLY_ONCE的语义保证
可以通过semantic 参数来选择三种不同的语义:
Semantic.NONE、Semantic.AT_LEAST_ONCE【默认】、Semantic.EXACTLY_ONCE

    //开启checkpoint
    env.enableCheckpointing(5000)

    val kafkaProducer = new FlinkKafkaProducer[String](topic,
      new KafkaSerializationSchemaWrapper[String](topic, null, false, new SimpleStringSchema()),
      prop,
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)

注意:此时执行代码会发现无法正常执行,socket打开之后,启动代码,会发现socket监听会自动断开,表示代码执行断开了。

但是此时在idea中看不到任何报错信息,主要是因为我们之前把日志级别改为error级别了,把日志级别调整为warn之后就可以看到报错信息了

log4j.rootLogger=warn,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

报错信息如下:

2020-08-12 19:21:59,759 [Sink: Unnamed (3/8)] [org.apache.flink.runtime.taskmanager.Task] [WARN] - Sink: Unnamed (3/8) (1b621d88e460877995ad37d34379c166) switched from RUNNING to FAILED.
org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
    at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1151)
    at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
    at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
    at java.lang.Thread.run(Thread.java:748)

提示生产者中设置的事务超时时间大于broker中设置的事务超时时间。
因为Kafka服务中默认事务的超时时间是15min,但是FlinkKafkaProducer里面设置的事务超时时间默认是1h。EXACTLY_ONCE 模式依赖于事务,如果从 Flink 应用程序崩溃到完全重启的时间超过了 Kafka 的事务超时时间,那么将会有数据丢失,所以我们需要合理地配置事务超时时间,因此在使用 EXACTLY_ONCE 模式之前建议增加 Kafka broker 中transaction.max.timeout.ms 的值。
下面我们需要修改kafka中的server.properties配置文件
bigdata01、bigdata02、bigdata03都需要修改

[root@bigdata01 kafka_2.12-2.4.1]# vi config/server.properties
...
transaction.max.timeout.ms=3600000
[root@bigdata02 kafka_2.12-2.4.1]# vi config/server.properties
...
transaction.max.timeout.ms=3600000
[root@bigdata03 kafka_2.12-2.4.1]# vi config/server.properties
...
transaction.max.timeout.ms=3600000

改完配置文件之后,重启kafka

[root@bigdata01 kafka_2.12-2.4.1]# JMX_PORT=9988 bin/kafka-server-start.sh -daemon config/server.properties 
[root@bigdata02 kafka_2.12-2.4.1]# JMX_PORT=9988 bin/kafka-server-start.sh -daemon config/server.properties 
[root@bigdata03 kafka_2.12-2.4.1]# JMX_PORT=9988 bin/kafka-server-start.sh -daemon config/server.properties

重新执行Flink代码,此时就不报错了。