Kafka-Connector
针对Flink的流处理,最常用的组件就是Kafka,原始日志数据产生后会被日志采集工具采集到Kafka中让Flink去处理,处理之后的数据可能也会继续写入到Kafka中,Kafka可以作为Flink的DataSource和DataSink来使用 并且Kafka中的Partition机制和Flink的并行度机制可以深度结合,提高数据的读取效率和写入效率。
想要在Flink中使用Kafka需要添加对应的依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.11.1</version></dependency>
Kafka作为DataSource
案例
演示一下在Flink中如何消费Kafka中的数据,此时需要用到Kafka Consumer
/*** Flink从Kafka中消费数据*/object StreamKafkaSource {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//指定FlinkKafkaConsumer相关配置val topic = "t1"val prop = new Properties()prop.setProperty("bootstrap.servers", "bigdata1:9092")prop.setProperty("group.id", "con1")val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)//指定kafka作为sourceval text = env.addSource(kafkaConsumer)//将读取到的数据打印到控制台上text.print()env.execute()}}
在运行代码之前,需要先启动zookeeper集群和kafka集群
在kafka中创建topic:t1
cd /data/soft/kafka_2.12-2.4.1/
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 1 --topic t1
然后启动代码
再启动一个Kafka 的 console生产者模拟产生数据,验证效果。
[root@bigdata1 kafka_2.12-2.4.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1
>hello flink
>hello kafka
>
Kafka Consumer消费策略设置
val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
//kafka consumer的消费策略设置
//默认策略,读取group.id对应保存的offset开始消费数据,读取不到则根据kafka中auto.offset.reset参数的值开始消费数据
kafkaConsumer.setStartFromGroupOffsets()
//从最早的记录开始消费数据,忽略已提交的offset信息
//kafkaConsumer.setStartFromEarliest()
//从最新的记录开始消费数据,忽略已提交的offset信息
//kafkaConsumer.setStartFromLatest()
//从指定的时间戳开始消费数据,对于每个分区,其时间戳大于或等于指定时间戳的记录将被作为起始位置
//kafkaConsumer.setStartFromTimestamp(1769498624)
Kafka Consumer的容错
Flink中也有checkpoint机制,Checkpoint是Flink实现容错机制的核心功能,它能够根据配置周期性地基于流中各个算子任务的State来生成快照,从而将这些State数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。
当CheckPoint机制开启的时候,Consumer会定期把Kafka的offset信息还有其它算子任务的State信息一块保存起来
当Job失败重启的时候,Flink会从最近一次的CheckPoint中进行恢复数据,重新消费Kafka中的数据
为了能够使用支持容错的Consumer,需要开启checkpoint
如何开启Checkpoint
那如何开启checkpoint呢?
//每隔5000 ms执行一次checkpoint(设置checkpoint的周期)
env.enableCheckpointing(5000)
针对checkpoint的相关配置
//设置模式为.EXACTLY_ONCE (这是默认值) ,还可以设置为AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//确保两次Checkpoint之间有至少多少 ms的间隔(checkpoint最小间隔)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//Checkpoint必须在一分钟内完成,或者被丢弃(checkpoint的超时时间)
env.getCheckpointConfig.setCheckpointTimeout(60000)
//同一时间只允许执行一个Checkpoint
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
最后还有一个配置,设置State数据存储的位置
默认情况下,State数据会保存在TaskManager的内存中,Checkpoint执行时,会将State数据存储在JobManager的内存中。
具体的存储位置取决于State Backend的配置,Flink 一共提供了3种存储方式
MemoryStateBackendState数据保存在Java堆内存中,执行Checkpoint的时候,会把State的快照数据保存到JobManager的内存中,基于内存的State Backend在生产环境下不建议使用。FsStateBackendState数据保存在TaskManager的内存中,执行Checkpoint的时候,会把State的快照数据保存到配置的文件系统中,可以使用HDFS等分布式文件系统。RocksDBStateBackendRocksDB跟上面的都略有不同,它会在本地文件系统中维护State,State会直接写入本地RocksDB中。同时它需要配置一个远端的文件系统(一般是HDFS),在做Checkpoint的时候,会把本地的数据直接复制到远端的文件系统中。故障切换的时候直接从远端的文件系统中恢复数据到本地。RocksDB克服了State受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产环境中使用。
所以在这里我们使用第三种:RocksDBStateBackend 针对RocksDBStateBackend需要引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.11.1</version>
</dependency>
//设置状态数据存储的位置
env.setStateBackend(new RocksDBStateBackend("hdfs://bigdata1:9000/flink/checkpoints",true))
Kafka Consumers Offset 自动提交
Kafka Consumers Offset自动提交机制需要根据Job是否开启Checkpoint来区分。
CheckPoint关闭时:通过参数enable.auto.commit和auto.commit.interval.ms控制
CheckPoint开启时:执行CheckPoint的时候才会提交offset,此时kafka中的自动提交机制就会被忽略
Kafka作为DataSink
案例
下面我们来看一下在Flink中如何向Kafka中写数据,此时需要用到Kafka Producer
/**
* Flink向Kafka中生产数据
* partitioner设置为FlinkFixedPartitioner,所有的数据都写入一个分区
*/
object StreamKafkaSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//socket作为source
val text = env.socketTextStream("bigdata1", 9002)
//指定FlinkKafkaProducer的相关配置
val topic = "t2"
val prop = new Properties()
prop.setProperty("bootstrap.servers", "bigdata1:9092")
/**
* 指定kafka作为sink
* KafkaSerializationSchemaWrapper的几个参数
* 1:topic:指定需要写入的topic名称即可
* 2:partitioner,通过自定义分区器实现将数据写入到指定topic的具体分区中
* 默认会使用FlinkFixedPartitioner,它表示会将所有的数据都写入指定topic的一个分区里面
* 如果不想自定义分区器,也不想使用默认的,可以直接使用null即可
* 3:writeTimeStamp,向topic中写入数据的时候,是否写入时间戳
* 如果写入了,那么在watermark的案例中,使用extractTimestamp()提起时间戳的时候,就可以直接使用recordTimestamp即可,它表示的就是我们在这里写入的数据对应的timestamp
* 4、SimpleStringSchema,序列化
*/
val kafkaProducer = new FlinkKafkaProducer[String](topic,
new KafkaSerializationSchemaWrapper[String](topic, new FlinkFixedPartitioner[String](), false, new SimpleStringSchema()),
prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
text.addSink(kafkaProducer)
env.execute("StreamKafkaSinkScala")
}
}
在执行代码之前,需要创建topic:t2
cd /data/soft/kafka_2.12-2.4.1/
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 1 --topic t2
启动kafka-console-consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t2
开启socket,产生数据
[root@bigdata1 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t2
hello
hello flink
hello kafka
hello scala
zzl
查看kafka-console-consumer
数据传输流程:socket→flink→kafka
在CMAK中查看t2中数据的分布,发现所有数据都在一个分区中
设置写入写入的分区
如果我们不需要自定义分区器的时候,直接传递为null即可。不要使用FlinkFixedPartitioner,它会将数据都写入到topic的一个分区中。
将FlinkFixedPartitioner设置为null,重新验证一次,创建一个新的topic,t3
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 1 --topic t3
val topic = "t2"
val kafkaProducer = new FlinkKafkaProducer[String](topic,
new KafkaSerializationSchemaWrapper[String](topic, new FlinkFixedPartitioner[String](), false, new SimpleStringSchema()),
prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
改成
val topic = "t3"
val kafkaProducer = new FlinkKafkaProducer[String](topic,
new KafkaSerializationSchemaWrapper[String](topic, null, false, new SimpleStringSchema()),
prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
开启socket,产生数据
[root@bigdata1 ~]# nc -l 9002
hhh
111
222
333
444
555
666
777
888
Kafka Producer的容错
如果Flink开启了CheckPoint,针对FlinkKafkaProducer可以提供EXACTLY_ONCE的语义保证
可以通过semantic 参数来选择三种不同的语义:
Semantic.NONE、Semantic.AT_LEAST_ONCE【默认】、Semantic.EXACTLY_ONCE
//开启checkpoint
env.enableCheckpointing(5000)
val kafkaProducer = new FlinkKafkaProducer[String](topic,
new KafkaSerializationSchemaWrapper[String](topic, null, false, new SimpleStringSchema()),
prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
注意:此时执行代码会发现无法正常执行,socket打开之后,启动代码,会发现socket监听会自动断开,表示代码执行断开了。
但是此时在idea中看不到任何报错信息,主要是因为我们之前把日志级别改为error级别了,把日志级别调整为warn之后就可以看到报错信息了
log4j.rootLogger=warn,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
报错信息如下:
2020-08-12 19:21:59,759 [Sink: Unnamed (3/8)] [org.apache.flink.runtime.taskmanager.Task] [WARN] - Sink: Unnamed (3/8) (1b621d88e460877995ad37d34379c166) switched from RUNNING to FAILED.
org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1151)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
at java.lang.Thread.run(Thread.java:748)
提示生产者中设置的事务超时时间大于broker中设置的事务超时时间。
因为Kafka服务中默认事务的超时时间是15min,但是FlinkKafkaProducer里面设置的事务超时时间默认是1h。EXACTLY_ONCE 模式依赖于事务,如果从 Flink 应用程序崩溃到完全重启的时间超过了 Kafka 的事务超时时间,那么将会有数据丢失,所以我们需要合理地配置事务超时时间,因此在使用 EXACTLY_ONCE 模式之前建议增加 Kafka broker 中transaction.max.timeout.ms 的值。
下面我们需要修改kafka中的server.properties配置文件
bigdata01、bigdata02、bigdata03都需要修改
[root@bigdata01 kafka_2.12-2.4.1]# vi config/server.properties
...
transaction.max.timeout.ms=3600000
[root@bigdata02 kafka_2.12-2.4.1]# vi config/server.properties
...
transaction.max.timeout.ms=3600000
[root@bigdata03 kafka_2.12-2.4.1]# vi config/server.properties
...
transaction.max.timeout.ms=3600000
改完配置文件之后,重启kafka
[root@bigdata01 kafka_2.12-2.4.1]# JMX_PORT=9988 bin/kafka-server-start.sh -daemon config/server.properties
[root@bigdata02 kafka_2.12-2.4.1]# JMX_PORT=9988 bin/kafka-server-start.sh -daemon config/server.properties
[root@bigdata03 kafka_2.12-2.4.1]# JMX_PORT=9988 bin/kafka-server-start.sh -daemon config/server.properties
重新执行Flink代码,此时就不报错了。

