title: 大数剧-flink-高级编程Adate: 2021-01-22 12:11:51
tags: 大数剧-flink
category:
- 大数剧
- flink
summary: flink开发source、operator、sink、状态与容错
top: false
cover: true
author: 张文军

Java快速开发学习

锁清秋

大数剧-flink-高级编程A

  • flink开发source、operator、sink、状态与容错

一、flink开发source、operator、sink

1. 计算模型

img

2.DataSource

输入Controlling Latency (控制延迟)
默认情况下,流中的元素并不会一个一个的在网络中传输(这会导致不必要的网络流量消耗) ,而是缓存起来,缓存的大小可以在Flink的配置文件、 ExecutionEnvironment、在某个算子上进行配置(默认100ms)
· 好处:提高吞吐
· 坏处:增加了延迟
· 如何把握平衡

  • 为了最大吞吐量,可以设置setBufferTimeout(-1),这会移除timeout机制,缓存中的数据一满就会被发送
  • 为了最小的延迟,可以将超时设置为接近0的数(例如5或者10ms)
  • 缓存的超时不要设置为0,因为设置为0会带来一些性能的损耗

内置数据源

  1. 基于文件

env**.**readTextFile**(**"file:///path"**)** env**.**readFile**(**inputFormat**,** "file:///path"**);**

  1. 基于Socket
    env**.**socketTextStream**(**"localhost"**,** 6666**,** '\n'**)**
  1. 基于Collection
  1. env.socketTextStream("localhost", 6666, '\n')import org.apache.flink.api.scala._
  2. env.fromCollection(List(1,2,3))
  3. env.fromElements(1,2,3)
  4. env.generateSequence(0, 1000) #不需要隐式转换

自定义数据源

  1. 实现SourceFunction(非并行的)

示例代码:
function:

  1. package cn.zhanghub.source;
  2. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FSDataInputStream;
  5. import org.apache.hadoop.fs.FileChecksum;
  6. import org.apache.hadoop.fs.FileSystem;
  7. import org.apache.hadoop.fs.Path;
  8. import java.io.BufferedReader;
  9. import java.io.InputStreamReader;
  10. public class FileCountryDictSourceFunction implements SourceFunction<String> {
  11. private String md5 = null;
  12. private Boolean isCancel = true;
  13. private Integer interval = 10000;
  14. @Override
  15. public void run(SourceContext<String> ctx) throws Exception {
  16. Path pathString = new Path("hdfs://ns1/user/qingniu/country_data");
  17. Configuration hadoopConf = new Configuration();
  18. FileSystem fs = FileSystem.get(hadoopConf);
  19. while (isCancel) {
  20. if(!fs.exists(pathString)){
  21. Thread.sleep(interval);
  22. continue;
  23. }
  24. FileChecksum fileChecksum = fs.getFileChecksum(pathString);
  25. String md5Str = fileChecksum.toString();
  26. String currentMd5 = md5Str.substring(md5Str.indexOf(":") + 1);
  27. if (!currentMd5.equals(md5)) {
  28. FSDataInputStream open = fs.open(pathString);
  29. BufferedReader reader = new BufferedReader(new InputStreamReader(open));
  30. String line = reader.readLine();
  31. while (line != null) {
  32. ctx.collect(line);
  33. line = reader.readLine();
  34. }
  35. reader.close();
  36. md5 = currentMd5;
  37. }
  38. Thread.sleep(interval);
  39. }
  40. }
  41. @Override
  42. public void cancel() {
  43. isCancel = false;
  44. }
  45. }

运行时:

  1. package cn.zhanghub.source;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. public class FileSource {
  5. public static void main(String[] args) throws Exception {
  6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  7. DataStreamSource<String> stringDataStreamSource = env.addSource(new FileCountryDictSourceFunction());
  8. stringDataStreamSource.print();
  9. env.execute();
  10. }
  11. }
  1. 实现ParallelSourceFunction与RichParallelSourceFunction(并行的)

以Kafka-connector-source为代表
· 基于Kafka 的partition 机制,Flink实现了并行化数据切分
· Flink 可以消费Kafka的topic,和sink数据到Kafka
· 出现失败时,flink通过checkpoint机制来协调Kafka来恢复应用(通过设置kafka的offset)
引入依赖:
flink支持的kafka版本对比:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
  4. <version>1.9.3</version>
  5. <scope>compile</scope>
  6. </dependency>

flink支持的kafka版本对比:

| Maven Dependency | 支持自 | Class name | Kafka版本 | 说明 | | —- | —- | —- | —- | —- |

| flink-connector-kafka- 0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 | 0.8.x | 内部使用kakfa的 SimpleConsumer API 。 Flink把Offset提交给Zookeeper |

| flink-connector-kafka- 0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 | 0.9.x | 使用kafka的new Consumer API Kafka. |

| flink-connector-kafka- 0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 | 0.10.x | 生产和消费支持 Kafka messages with timestamps |

1).Flink KafkaConsumer Source API
img
1.FlinkKafkaConsumer010创建方式:

  1. FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema<T> deserializer, Properties props)
  2. FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props)
  3. FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props)
  4. FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props)
  • 三个构造参数:
  • 要消费的topic(topic name / topic names/正表达式)
  • DeserializationSchema / KeyedDeserializationSchema(反序列化Kafka中的数据))
  • Kafka consumer的属性,其中三个属性必须提供:
  • bootstrap.servers(逗号分隔的Kafka broker列表)
  • zookeeper.connect(逗号分隔的Zookeeper server列表,仅Kafka 0.8需要))
  • group.id(consumer group id)

2.反序列化Schema类型

  • 作用:对kafka里获取的二进制数据进行反序列化
  • FlinkKafkaConsumer需要知道如何将Kafka中的二进制数据转换成Java/Scala对象,DeserializationSchema定义了该转换模式,通过T deserialize(byte[] message)
  • FlinkKafkaConsumer从kafka获取的每条消息都会通过DeserializationSchema的T deserialize(byte[] message)反序列化处理
  • 反序列化Schema类型(接口):
  • DeserializationSchema(只反序列化value)
  • KeyedDeserializationSchema

3.常见反序列化Schema

  • SimpleStringSchema
  • JSONDeserializationSchema / JSONKeyValueDeserializationSchema
  • TypeInformationSerializationSchema / TypeInformationKeyValueSerializationSchema
  • AvroDeserializationSchema

4.自定义反序列化Schema:

  • 实现DeserializationSchema与KeyedDeserializationSchema接口

DeserializationSchema:
img
KeyedDeserializationSchema:
img
bean:

  1. package cn.zhanghub.source;
  2. public class HainiuKafkaRecord {
  3. private String record;
  4. public HainiuKafkaRecord(String record) {
  5. this.record = record;
  6. }
  7. public String getRecord() {
  8. return record;
  9. }
  10. public void setRecord(String record) {
  11. this.record = record;
  12. }
  13. }

schema:

  1. package cn.zhanghub.source;
  2. import org.apache.flink.api.common.serialization.DeserializationSchema;
  3. import org.apache.flink.api.common.typeinfo.TypeInformation;
  4. import java.io.IOException;
  5. public class HainiuKafkaRecordSchema implements DeserializationSchema<HainiuKafkaRecord> {
  6. @Override
  7. public HainiuKafkaRecord deserialize(byte[] message) throws IOException {
  8. HainiuKafkaRecord hainiuKafkaRecord = new HainiuKafkaRecord(new String(message));
  9. return hainiuKafkaRecord;
  10. }
  11. @Override
  12. public boolean isEndOfStream(HainiuKafkaRecord nextElement) {
  13. return false;
  14. }
  15. @Override
  16. public TypeInformation<HainiuKafkaRecord> getProducedType() {
  17. return TypeInformation.of(HainiuKafkaRecord.class);
  18. }
  19. }

5.FlinkKafkaConsumer010最简样版代码
img
6.FlinkKafkaConsumer消费
img

| 消费模式 | 说明 | | | —- | —- | —- |

| setStartFromEarliest | 从队头开始,最早的记录 | 内部的Consumer提交到Kafka/zk中的偏移量将被忽略 |

| setStartFromLatest | 从队尾开始,最新的记录 | |

| setStartFromGroupOffsets() | 默认值,从当前消费组记录的偏移量开始,接着上次的偏移量消费 | 以Consumer提交到Kafka/zk中的偏移量最为起始位置开始消费, group.id设置在consumer的properties里; 如果没找到记录的偏移量,则使用consumer的properties的 auto.offset.reset设置的策略 |

| setStartFromSpecificOffsets(Map的参数) | 从指定的具体位置开始消费 | |

| setStartFromTimestamp(long) | 从指定的时间戳开始消费 | 对于每个分区,时间戳大于或等于指定时间戳的记录将用作起始位 置。如果一个分区的最新记录早于时间戳,那么只需要从最新记录 中读取该分区。在此模式下,Kafka/zk中提交的偏移量将被忽略 |

注意

  1. kafka 0.8版本, consumer提交偏移量到zookeeper,后续版本提交到kafka(一个特殊的topic: __consumer_offsets)

7.动态Partition discovery

  • Flink Kafka Consumer支持动态发现Kafka分区,且能保证exactly-once
  • 默认禁止动态发现分区,把flink.partition-discovery.interval-millis设置大于0即可启用:

properties.setProperty(“flink.partition-discovery.interval-millis”, “30000”)
8.动态Topic discovery

  • Flink Kafka Consumer支持动态发现Kafka Topic,仅限通过正则表达式指定topic的方式
  • 默认禁止动态发现topic,把flink.partition-discovery.interval-millis设置大于0即可启用

img
示例代码:

  1. package cn.zhanghub.source;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
  6. import java.util.Properties;
  7. import java.util.regex.Pattern;
  8. public class KafkaRichParallelSource {
  9. public static void main(String[] args) throws Exception {
  10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. Properties kafkaConsumerProps = new Properties();
  12. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  13. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  14. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  15. FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new SimpleStringSchema(), kafkaConsumerProps);
  16. // FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>(Pattern.compile("flink_event_[0-9]"), new SimpleStringSchema(), kafkaConsumerProps);
  17. // kafkaSource.setStartFromEarliest()
  18. // kafkaSource.setStartFromGroupOffsets()
  19. kafkaSource.setStartFromLatest();
  20. DataStreamSource<String> kafkaInput = env.addSource(kafkaSource);
  21. kafkaInput.print();
  22. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaBeanSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  23. DataStreamSource<HainiuKafkaRecord> kafkaBeanInput = env.addSource(kafkaBeanSource);
  24. kafkaBeanInput.print();
  25. env.execute();
  26. }
  27. }

3.transformations

下图展示了 Flink 中目前支持的主要几种流的类型,以及它们之间的转换关系。
img
DataStream
DataStream 是 Flink 流处理 API 中最核心的数据结构。它代表了一个运行在多个分区上的并行流。一个 DataStream 可以从 StreamExecutionEnvironment 通过env.addSource(SourceFunction) 获得。
DataStream 上的转换操作都是逐条的,比如 map(),flatMap(),filter()
自定义转换函数
1.函数
scala函数
data.flatMap(f => f.split(“ “))
java的lambda
data.flatMap(f -> f.split(“ “));
2.实现接口

  1. text.flatMap(new FlatMapFunction[String,String] {
  2. override def flatMap(value: String, out: Collector[String]) = {
  3. val strings: Array[String] = value.split(" ")
  4. for(s <- strings){
  5. out.collect(s)
  6. }
  7. }
  8. })data.flatMap(f -> f.split(" "));

3.Rich Functions
Rich Function中有非常有用的四个方法:open,close,getRuntimeContext和setRuntimecontext 这些功能在创建本地状态、获取广播变量、获取运行时信息(例如累加器和计数器)和迭代信息时非常有帮助。
示例代码:

  1. import java.util.Properties
  2. import org.apache.flink.api.common.functions.RichFlatMapFunction
  3. import org.apache.flink.configuration.Configuration
  4. import org.apache.flink.util.Collector
  5. import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerRecord}
  6. class HainiuRichFlatMapFunction(topic:String,props:Properties) extends RichFlatMapFunction[String,Int]{
  7. var producer:Producer[String,String] = _
  8. override def open(parameters: Configuration): Unit = {
  9. //创建kafka生产者
  10. producer = new KafkaProducer[String,String](props)
  11. }
  12. override def close(): Unit = {
  13. //关闭kafka生产者
  14. producer.close()
  15. }
  16. override def flatMap(value: String, out: Collector[Int]): Unit = {
  17. //使用RuntimeContext得到子线程ID,比如可以用于多线程写文件
  18. println(getRuntimeContext.getIndexOfThisSubtask)
  19. //发送数据到kafka
  20. producer.send(new ProducerRecord[String,String](topic,value))
  21. }
  22. }

operators
1.connect 与 union (合并流)

  • connect之后生成ConnectedStreams,会对两个流的数据应用不同的处理方法,并且双流之间可以共享状态(比如计数)。这在第一个流的输入会影响第二个流时, 会非常有用。union 合并多个流,新的流包含所有流的数据。
  • union是DataStream → DataStream
  • connect只能连接两个流,而union可以连接多于两个流
  • connect连接的两个流类型可以不一致,而union连接的流的类型必须一致

img
示例代码:
union:

  1. package cn.zhanghub.operator;
  2. import cn.zhanghub.source.FileCountryDictSourceFunction;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.ProcessFunction;
  9. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
  10. import org.apache.flink.util.Collector;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. import java.util.Properties;
  14. public class CountryCodeUnion {
  15. public static void main(String[] args) throws Exception {
  16. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17. env.setParallelism(1);
  18. DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
  19. Properties kafkaConsumerProps = new Properties();
  20. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  21. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  22. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  23. FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new SimpleStringSchema(), kafkaConsumerProps);
  24. // kafkaSource.setStartFromEarliest()
  25. // kafkaSource.setStartFromGroupOffsets()
  26. kafkaSource.setStartFromLatest();
  27. DataStreamSource<String> kafkainput = env.addSource(kafkaSource);
  28. DataStream<String> union = countryDictSource.union(kafkainput);
  29. SingleOutputStreamOperator<String> process = union.process(new ProcessFunction<String, String>() {
  30. private Map<String, String> map = new HashMap<>();
  31. @Override
  32. public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
  33. String[] split = value.split("\t");
  34. if (split.length > 1) {
  35. map.put(split[0], split[1]);
  36. out.collect(value);
  37. } else {
  38. String countryName = map.get(value);
  39. String outStr = countryName == null ? "no match" : countryName;
  40. out.collect(outStr);
  41. }
  42. }
  43. });
  44. process.print();
  45. env.execute();
  46. }
  47. }

connect:

  1. package cn.zhanghub.operator;
  2. import cn.zhanghub.source.FileCountryDictSourceFunction;
  3. import cn.zhanghub.source.HainiuKafkaRecord;
  4. import cn.zhanghub.source.HainiuKafkaRecordSchema;
  5. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
  10. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
  11. import org.apache.flink.util.Collector;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. import java.util.Properties;
  15. public class CountryCodeConnect {
  16. public static void main(String[] args) throws Exception {
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. env.setParallelism(1);
  19. DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
  20. Properties kafkaConsumerProps = new Properties();
  21. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  22. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  23. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  24. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  25. // kafkaSource.setStartFromEarliest()
  26. // kafkaSource.setStartFromGroupOffsets()
  27. kafkaSource.setStartFromLatest();
  28. DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
  29. ConnectedStreams<String, HainiuKafkaRecord> connect = countryDictSource.connect(kafkainput);
  30. SingleOutputStreamOperator<String> connectInput = connect.process(new CoProcessFunction<String, HainiuKafkaRecord, String>() {
  31. private Map<String, String> map = new HashMap<String, String>();
  32. @Override
  33. public void processElement1(String value, Context ctx, Collector<String> out) throws Exception {
  34. String[] split = value.split("\t");
  35. map.put(split[0], split[1]);
  36. out.collect(value);
  37. }
  38. @Override
  39. public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
  40. String countryCode = value.getRecord();
  41. String countryName = map.get(countryCode);
  42. String outStr = countryName == null ? "no match" : countryName;
  43. out.collect(outStr);
  44. }
  45. });
  46. connectInput.print();
  47. env.execute();
  48. }
  49. }

2.keyBy

  • 含义: 根据指定的key进行分组(逻辑上把DataStream分成若干不相交的分区,key一样的event会 被划分到相同的partition,内部采用类似于hash分区来实现)
  • 转换关系: DataStream → KeyedStream
  • 使用场景: 分组(类比SQL中的分组)比如join,coGroup,keyBy,groupBy,Reduce,GroupReduce,Aggregate,Windows

KeyedStream

  • KeyedStream用来表示根据指定的key进行分组的数据流。
  • 一个KeyedStream可以通过调用DataStream.keyBy()来获得。
  • 在KeyedStream上进行任何transformation都将转变回DataStream。
  • 在实现中,KeyedStream会把key的信息传入到算子的函数中。

示例代码:

  1. package cn.zhanghub.operator;
  2. import cn.zhanghub.source.FileCountryDictSourceFunction;
  3. import cn.zhanghub.source.HainiuKafkaRecord;
  4. import cn.zhanghub.source.HainiuKafkaRecordSchema;
  5. import org.apache.flink.api.common.functions.MapFunction;
  6. import org.apache.flink.api.java.functions.KeySelector;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  10. import org.apache.flink.streaming.api.datastream.KeyedStream;
  11. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
  14. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
  15. import org.apache.flink.util.Collector;
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. import java.util.Properties;
  19. public class CountryCodeConnectKeyBy {
  20. public static void main(String[] args) throws Exception {
  21. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  22. DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
  23. Properties kafkaConsumerProps = new Properties();
  24. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  25. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  26. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  27. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  28. // kafkaSource.setStartFromEarliest()
  29. // kafkaSource.setStartFromGroupOffsets()
  30. kafkaSource.setStartFromLatest();
  31. DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
  32. KeyedStream<Tuple2<String, String>, String> countryDictKeyBy = countryDictSource.map(new MapFunction<String, Tuple2<String, String>>() {
  33. @Override
  34. public Tuple2<String, String> map(String value) throws Exception {
  35. String[] split = value.split("\t");
  36. return Tuple2.of(split[0], split[1]);
  37. }
  38. }).keyBy(new KeySelector<Tuple2<String, String>, String>() {
  39. @Override
  40. public String getKey(Tuple2<String, String> value) throws Exception {
  41. return value.f0;
  42. }
  43. });
  44. KeyedStream<HainiuKafkaRecord, String> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, String>() {
  45. @Override
  46. public String getKey(HainiuKafkaRecord value) throws Exception {
  47. return value.getRecord();
  48. }
  49. });
  50. ConnectedStreams<Tuple2<String, String>, HainiuKafkaRecord> connect = countryDictKeyBy.connect(record);
  51. SingleOutputStreamOperator<String> connectInput = connect.process(new KeyedCoProcessFunction<String, Tuple2<String, String>, HainiuKafkaRecord, String>() {
  52. private Map<String, String> map = new HashMap<String, String>();
  53. @Override
  54. public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
  55. map.put(ctx.getCurrentKey(), value.f1);
  56. out.collect(value.toString());
  57. }
  58. @Override
  59. public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
  60. String countryCode = ctx.getCurrentKey();
  61. String countryName = map.get(countryCode);
  62. String outStr = countryName == null ? "no match" : countryName;
  63. out.collect(outStr);
  64. }
  65. });
  66. connectInput.print();
  67. env.execute();
  68. }
  69. }

Key的类型限制:

  • 不能是没有覆盖hashCode方法的POJO(也就是bean)
  • 不能是数组

POJO:

  1. package cn.zhanghub.source;
  2. public class HainiuKafkaRecord {
  3. private String record;
  4. public HainiuKafkaRecord(String record) {
  5. this.record = record;
  6. }
  7. public String getRecord() {
  8. return record;
  9. }
  10. public void setRecord(String record) {
  11. this.record = record;
  12. }
  13. @Override
  14. public int hashCode() {
  15. final int prime = 31;
  16. int result = 1;
  17. result = prime * result + ((record == null) ? 0 : record.hashCode());
  18. return result;
  19. }
  20. }

示例代码:

  1. package cn.zhanghub.operator;
  2. import cn.zhanghub.source.FileCountryDictSourceFunction;
  3. import cn.zhanghub.source.HainiuKafkaRecord;
  4. import cn.zhanghub.source.HainiuKafkaRecordSchema;
  5. import org.apache.flink.api.common.functions.MapFunction;
  6. import org.apache.flink.api.java.functions.KeySelector;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  10. import org.apache.flink.streaming.api.datastream.KeyedStream;
  11. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
  14. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
  15. import org.apache.flink.util.Collector;
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. import java.util.Properties;
  19. public class CountryCodeConnectKeyByObject {
  20. public static void main(String[] args) throws Exception {
  21. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  22. DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
  23. Properties kafkaConsumerProps = new Properties();
  24. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  25. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  26. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  27. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  28. // kafkaSource.setStartFromEarliest()
  29. // kafkaSource.setStartFromGroupOffsets()
  30. kafkaSource.setStartFromLatest();
  31. DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
  32. KeyedStream<Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord> countryDictKeyBy = countryDictSource.map(new MapFunction<String, Tuple2<HainiuKafkaRecord, String>>() {
  33. @Override
  34. public Tuple2<HainiuKafkaRecord, String> map(String value) throws Exception {
  35. String[] split = value.split("\t");
  36. return Tuple2.of(new HainiuKafkaRecord(new String(split[0])), split[1]);
  37. }
  38. }).keyBy(new KeySelector<Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord>() {
  39. @Override
  40. public HainiuKafkaRecord getKey(Tuple2<HainiuKafkaRecord, String> value) throws Exception {
  41. return value.f0;
  42. }
  43. });
  44. KeyedStream<HainiuKafkaRecord, HainiuKafkaRecord> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, HainiuKafkaRecord>() {
  45. @Override
  46. public HainiuKafkaRecord getKey(HainiuKafkaRecord value) throws Exception {
  47. return value;
  48. }
  49. });
  50. ConnectedStreams<Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord> connect = countryDictKeyBy.connect(record);
  51. SingleOutputStreamOperator<String> connectInput = connect.process(new KeyedCoProcessFunction<HainiuKafkaRecord, Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord, String>() {
  52. private Map<String, String> map = new HashMap<String, String>();
  53. @Override
  54. public void processElement1(Tuple2<HainiuKafkaRecord, String> value, Context ctx, Collector<String> out) throws Exception {
  55. String currentKey = ctx.getCurrentKey().getRecord();
  56. map.put(currentKey, value.f1);
  57. out.collect(value.toString());
  58. }
  59. @Override
  60. public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
  61. HainiuKafkaRecord currentKey = ctx.getCurrentKey();
  62. String countryName = map.get(currentKey.getRecord());
  63. String outStr = countryName == null ? "no match" : countryName;
  64. out.collect(currentKey.toString() + "--" + outStr);
  65. }
  66. });
  67. connectInput.print();
  68. env.execute();
  69. }
  70. }

*可能会出现数据倾斜,可根据实际情况结合物理分区来解决
3.物理分区
算子间数据传递模式

  • One-to-one streams 保持元素的分区和顺序
  • Redistributing streams

改变流的分区策略取决于使用的算子

  • keyBy()(re-partitions by hashing the key)
  • broadcast()
  • rebalance()(which re-partitions randomly)

都是Transformation,都可以改变分区

| 分区Transformation | 说明 | | —- | —- |

| Random partitioning | 按均匀分布随机划分元素,网络开销往往比较大 dataStream.shuffle() |

| Round-robin partitioning | 循环对元素进行分区,为每一个分区创建相等的负载,这在数据倾斜时非常有用的: dataStream.rebalance() |

| Rescaling | 跟rebalance有点类似,但不是全局的,通过轮询调度将元素从上游的task一个子 集发送到下游task的一个子集: dataStream.rescale(); |

| Broadcasting | 将元素广播到每个分区上 dataStream.broadcast(); |

| Custom partitioning | dataStream.partitionCustom(partitioner, “someKey”) 或 dataStream.partitionCustom(partitioner, 0) |

4.解决数据倾斜
1).One-to-one streams 解决数据倾斜的方法:
rebalance

  • 含义:再平衡,用来减轻数据倾斜
  • 转换关系: DataStream → DataStream
  • 使用场景:处理数据倾斜,比如某个kafka的partition的数据比较多

示例代码:

  1. val stream: DataStream[MyType] = env.addSource(new FlinkKafkaConsumer08[String](...))
  2. val str1: DataStream[(String, MyType)] = stream.flatMap { ... }
  3. val str2: DataStream[(String, MyType)] = str1.rebalance()
  4. val str3: DataStream[AnotherType] = str2.map { ... }

上述 DataStream 上的转换在运行时会转换成如下的执行图:
img
如上图的执行图所示,DataStream 各个算子会并行运行,算子之间是数据流分区。如 Source 的第一个并行实例(S1)和 flatMap() 的第一个并行实例(m1)之间就是一个数据流分区。而在 flatMap() 和 map() 之间由于加了 rebalance(),它们之间的数据流分区就有3个子分区(m1的数据流向3个map()实例)。
rescale

  • 原理:通过轮询调度将元素从上游的task一个子集发送到下游task的一个子集
  • 转换关系:DataStream → DataStream
  • 使用场景:数据传输都在一个TaskManager内,不需要通过网络。

原理:
第一个task并行度为2,第二个task并行度为6,第三个task并行度为2。从第一个task到第二个task,Src的 子集Src1 和 Map的子集Map1,2,3对应起来,Src1会以轮询调度的方式分别向Map1,2,3发送记录。 从第二个task到第三个task,Map的子集1,2,3对应Sink的子集1,这三个流的元素只会发送到Sink1。 假设我们每个TaskManager有三个Slot,并且我们开了SlotSharingGroup,那么通过rescale,所有的数据传输都在一个TaskManager内,不需要通过网络。
img
2).Redistributing streams 解决数据倾斜的方法:
自定义partitioner

  • 转换关系:DataStream → DataStream
  • 使用场景:自定义数据处理负载
  • 实现方法:
  • 实现org.apache.flink.api.common.functions.Partitioner接口
  • 覆盖partition方法
  • 设计算法返回partitionId

示例代码:

  1. package cn.zhanghub.operator;
  2. import cn.zhanghub.source.FileCountryDictSourceFunction;
  3. import cn.zhanghub.source.HainiuKafkaRecord;
  4. import cn.zhanghub.source.HainiuKafkaRecordSchema;
  5. import org.apache.flink.api.common.functions.MapFunction;
  6. import org.apache.flink.api.common.functions.Partitioner;
  7. import org.apache.flink.api.java.functions.KeySelector;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  10. import org.apache.flink.streaming.api.datastream.DataStream;
  11. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
  15. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
  16. import org.apache.flink.util.Collector;
  17. import java.util.HashMap;
  18. import java.util.Map;
  19. import java.util.Properties;
  20. public class CountryCodeConnectCustomPartitioner {
  21. public static void main(String[] args) throws Exception {
  22. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23. DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
  24. Properties kafkaConsumerProps = new Properties();
  25. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  26. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  27. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  28. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  29. // kafkaSource.setStartFromEarliest()
  30. // kafkaSource.setStartFromGroupOffsets()
  31. kafkaSource.setStartFromLatest();
  32. DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
  33. DataStream<Tuple2<String, String>> countryDictPartition = countryDictSource.map(new MapFunction<String, Tuple2<String, String>>() {
  34. @Override
  35. public Tuple2<String, String> map(String value) throws Exception {
  36. String[] split = value.split("\t");
  37. return Tuple2.of(split[0], split[1]);
  38. }
  39. }).partitionCustom(new Partitioner<String>() {
  40. @Override
  41. public int partition(String key, int numPartitions) {
  42. if (key.contains("CN")) {
  43. return 0;
  44. } else {
  45. return 1;
  46. }
  47. }
  48. }, new KeySelector<Tuple2<String, String>, String>() {
  49. @Override
  50. public String getKey(Tuple2<String, String> value) throws Exception {
  51. return value.f0;
  52. }
  53. });
  54. DataStream<HainiuKafkaRecord> recordPartition = kafkainput.partitionCustom(new Partitioner<String>() {
  55. @Override
  56. public int partition(String key, int numPartitions) {
  57. if (key.contains("CN")) {
  58. return 0;
  59. } else {
  60. return 1;
  61. }
  62. }
  63. }, new KeySelector<HainiuKafkaRecord, String>() {
  64. @Override
  65. public String getKey(HainiuKafkaRecord value) throws Exception {
  66. return value.getRecord();
  67. }
  68. });
  69. ConnectedStreams<Tuple2<String, String>, HainiuKafkaRecord> connect = countryDictPartition.connect(recordPartition);
  70. SingleOutputStreamOperator<String> connectInput = connect.process(new CoProcessFunction<Tuple2<String, String>, HainiuKafkaRecord, String>() {
  71. private Map<String, String> map = new HashMap<String, String>();
  72. @Override
  73. public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
  74. map.put(value.f0, value.f1);
  75. out.collect(value.toString());
  76. }
  77. @Override
  78. public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
  79. String countryCode = value.getRecord();
  80. String countryName = map.get(countryCode);
  81. String outStr = countryName == null ? "no match" : countryName;
  82. out.collect(outStr);
  83. }
  84. });
  85. connectInput.print();
  86. env.execute();
  87. }
  88. }

使用parititoner解决数据倾斜

  1. package cn.zhanghub.flink.operator;
  2. import cn.zhanghub.flink.source.FileCountryDictSourceFunction;
  3. import cn.zhanghub.flink.source.HainiuKafkaRecord;
  4. import cn.zhanghub.flink.source.HainiuKafkaRecordSchema;
  5. import org.apache.flink.api.common.functions.FlatMapFunction;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. import org.apache.flink.api.common.functions.Partitioner;
  8. import org.apache.flink.api.java.functions.KeySelector;
  9. import org.apache.flink.api.java.tuple.Tuple2;
  10. import org.apache.flink.configuration.Configuration;
  11. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  12. import org.apache.flink.streaming.api.datastream.DataStream;
  13. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  14. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  15. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  16. import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
  17. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
  18. import org.apache.flink.util.Collector;
  19. import java.util.HashMap;
  20. import java.util.Map;
  21. import java.util.Properties;
  22. import java.util.Random;
  23. public class CountryCodeConnectCustomPartitioner {
  24. public static void main(String[] args) throws Exception {
  25. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
  26. Properties kafkaConsumerProps = new Properties();
  27. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  28. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  29. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  30. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  31. kafkaSource.setStartFromLatest();
  32. DataStreamSource<HainiuKafkaRecord> kafkaInput = env.addSource(kafkaSource);
  33. DataStream<HainiuKafkaRecord> kafka = kafkaInput.map(new MapFunction<HainiuKafkaRecord, HainiuKafkaRecord>() {
  34. @Override
  35. public HainiuKafkaRecord map(HainiuKafkaRecord value) throws Exception {
  36. String record = value.getRecord();
  37. Random random = new Random();
  38. int i = random.nextInt(10);
  39. return new HainiuKafkaRecord(i + "_" + record);
  40. }
  41. }).partitionCustom(new Partitioner<HainiuKafkaRecord>() {
  42. @Override
  43. public int partition(HainiuKafkaRecord key, int numPartitions) {
  44. String[] s = key.getRecord().split("_");
  45. String randomId = s[0];
  46. return new Integer(randomId);
  47. }
  48. },
  49. new KeySelector<HainiuKafkaRecord, HainiuKafkaRecord>() {
  50. @Override
  51. public HainiuKafkaRecord getKey(HainiuKafkaRecord value) throws Exception {
  52. return value;
  53. }
  54. });
  55. DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
  56. DataStream<Tuple2<HainiuKafkaRecord, String>> countryDict = countryDictSource.flatMap(new FlatMapFunction<String, Tuple2<HainiuKafkaRecord, String>>() {
  57. @Override
  58. public void flatMap(String value, Collector<Tuple2<HainiuKafkaRecord, String>> out) throws Exception {
  59. String[] split = value.split("\t");
  60. String key = split[0];
  61. String values = split[1];
  62. for (int i = 0; i < 10; i++) {
  63. String randomKey = i + "_" + key;
  64. Tuple2<HainiuKafkaRecord, String> t2 = Tuple2.of(new HainiuKafkaRecord(randomKey), values);
  65. out.collect(t2);
  66. }
  67. }
  68. }).partitionCustom(new Partitioner<HainiuKafkaRecord>() {
  69. @Override
  70. public int partition(HainiuKafkaRecord key, int numPartitions) {
  71. String[] s = key.getRecord().split("_");
  72. String randomId = s[0];
  73. return new Integer(randomId);
  74. }
  75. }, new KeySelector<Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord>() {
  76. @Override
  77. public HainiuKafkaRecord getKey(Tuple2<HainiuKafkaRecord, String> value) throws Exception {
  78. return value.f0;
  79. }
  80. });
  81. ConnectedStreams<Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord> connect = countryDict.connect(kafka);
  82. SingleOutputStreamOperator<String> connectInput = connect.process(new CoProcessFunction<Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord, String>() {
  83. private Map<String, String> map = new HashMap<String, String>();
  84. @Override
  85. public void processElement1(Tuple2<HainiuKafkaRecord, String> value, Context ctx, Collector<String> out) throws Exception {
  86. map.put(value.f0.getRecord(), value.f1);
  87. out.collect(value.toString());
  88. }
  89. @Override
  90. public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
  91. String countryName = map.get(value.getRecord());
  92. String outStr = countryName == null ? "no match" : countryName;
  93. out.collect(outStr);
  94. }
  95. });
  96. connectInput.print();
  97. env.execute();
  98. }
  99. }

5.reduce 与 fold

  • 分组之后当然要对分组之后的数据也就是KeyedStream进行各种聚合操作啦
  • KeyedStream → DataStream
  • 对于KeyedStream的聚合操作都是滚动的(rolling,在前面的状态基础上继续聚合),千万不要理解为批处理时的聚合操作(DataSet,其实也是滚动聚合,只不过他只把最后的结果给了我们)
  • | 聚合操作 | 意义 | | —- | —- |

| reduce | KeyedStream流上,将上一次reduce的结果和本次的进行操作 |

| fold | 对keyedStream流上的event进行连接操作 |

| sum/min/minBy/max/maxBy | reduce的特例,min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy) |

| process | 底层的聚合操作 |

示例代码:

  1. package cn.zhanghub.operator;
  2. import cn.zhanghub.source.FileCountryDictSourceFunction;
  3. import cn.zhanghub.source.HainiuKafkaRecord;
  4. import cn.zhanghub.source.HainiuKafkaRecordSchema;
  5. import org.apache.flink.api.common.functions.MapFunction;
  6. import org.apache.flink.api.common.functions.ReduceFunction;
  7. import org.apache.flink.api.java.functions.KeySelector;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.datastream.KeyedStream;
  12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
  15. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
  16. import org.apache.flink.util.Collector;
  17. import org.apache.flink.util.OutputTag;
  18. import java.util.HashMap;
  19. import java.util.Map;
  20. import java.util.Properties;
  21. public class CountryCodeConnectKeyByCountryCount {
  22. public static void main(String[] args) throws Exception {
  23. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  24. DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
  25. Properties kafkaConsumerProps = new Properties();
  26. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  27. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  28. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  29. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  30. // kafkaSource.setStartFromEarliest()
  31. // kafkaSource.setStartFromGroupOffsets()
  32. kafkaSource.setStartFromLatest();
  33. DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
  34. KeyedStream<Tuple2<String, String>, String> countryDictKeyBy = countryDictSource.map(new MapFunction<String, Tuple2<String, String>>() {
  35. @Override
  36. public Tuple2<String, String> map(String value) throws Exception {
  37. String[] split = value.split("\t");
  38. return Tuple2.of(split[0], split[1]);
  39. }
  40. }).keyBy(new KeySelector<Tuple2<String, String>, String>() {
  41. @Override
  42. public String getKey(Tuple2<String, String> value) throws Exception {
  43. return value.f0;
  44. }
  45. });
  46. KeyedStream<HainiuKafkaRecord, String> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, String>() {
  47. @Override
  48. public String getKey(HainiuKafkaRecord value) throws Exception {
  49. return value.getRecord();
  50. }
  51. });
  52. ConnectedStreams<Tuple2<String, String>, HainiuKafkaRecord> connect = countryDictKeyBy.connect(record);
  53. SingleOutputStreamOperator<Tuple2<String, Integer>> connectInput = connect.process(new KeyedCoProcessFunction<String, Tuple2<String, String>, HainiuKafkaRecord, Tuple2<String, Integer>>() {
  54. private Map<String, String> map = new HashMap<String, String>();
  55. @Override
  56. public void processElement1(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
  57. map.put(ctx.getCurrentKey(), value.f1);
  58. out.collect(Tuple2.of(value.f0, 1));
  59. }
  60. @Override
  61. public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
  62. String countryCode = ctx.getCurrentKey();
  63. String countryName = map.get(countryCode);
  64. String outStr = countryName == null ? "no match" : countryName;
  65. out.collect(Tuple2.of(countryName.substring(0, countryName.indexOf(" ")), 1));
  66. }
  67. });
  68. SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = connectInput.keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
  69. @Override
  70. public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
  71. return Tuple2.of(value1.f0, value1.f1 + value2.f1);
  72. }
  73. });
  74. reduce.print();
  75. env.execute();
  76. }
  77. }

6.OutputTab(拆分流)

  • 只能在processFunction中使用
  • 根据条件输出不同类型的数据

示例代码:

  1. package cn.zhanghub.operator;
  2. import cn.zhanghub.source.FileCountryDictSourceFunction;
  3. import cn.zhanghub.source.HainiuKafkaRecord;
  4. import cn.zhanghub.source.HainiuKafkaRecordSchema;
  5. import org.apache.flink.api.common.functions.MapFunction;
  6. import org.apache.flink.api.common.functions.ReduceFunction;
  7. import org.apache.flink.api.java.functions.KeySelector;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.datastream.KeyedStream;
  12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
  15. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
  16. import org.apache.flink.util.Collector;
  17. import org.apache.flink.util.OutputTag;
  18. import java.util.HashMap;
  19. import java.util.Map;
  20. import java.util.Properties;
  21. public class CountryCodeConnectKeyByCountryCountOutputTag {
  22. private static final OutputTag<String> ot = new OutputTag<String>("china") {
  23. };
  24. public static void main(String[] args) throws Exception {
  25. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  26. DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
  27. Properties kafkaConsumerProps = new Properties();
  28. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  29. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  30. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  31. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  32. // kafkaSource.setStartFromEarliest()
  33. // kafkaSource.setStartFromGroupOffsets()
  34. kafkaSource.setStartFromLatest();
  35. DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
  36. KeyedStream<Tuple2<String, String>, String> countryDictKeyBy = countryDictSource.map(new MapFunction<String, Tuple2<String, String>>() {
  37. @Override
  38. public Tuple2<String, String> map(String value) throws Exception {
  39. String[] split = value.split("\t");
  40. return Tuple2.of(split[0], split[1]);
  41. }
  42. }).keyBy(new KeySelector<Tuple2<String, String>, String>() {
  43. @Override
  44. public String getKey(Tuple2<String, String> value) throws Exception {
  45. return value.f0;
  46. }
  47. });
  48. KeyedStream<HainiuKafkaRecord, String> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, String>() {
  49. @Override
  50. public String getKey(HainiuKafkaRecord value) throws Exception {
  51. return value.getRecord();
  52. }
  53. });
  54. ConnectedStreams<Tuple2<String, String>, HainiuKafkaRecord> connect = countryDictKeyBy.connect(record);
  55. SingleOutputStreamOperator<Tuple2<String, Integer>> connectInput = connect.process(new KeyedCoProcessFunction<String, Tuple2<String, String>, HainiuKafkaRecord, Tuple2<String, Integer>>() {
  56. private Map<String, String> map = new HashMap<String, String>();
  57. @Override
  58. public void processElement1(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
  59. map.put(ctx.getCurrentKey(), value.f1);
  60. out.collect(Tuple2.of(value.f0, 1));
  61. }
  62. @Override
  63. public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
  64. String countryCode = ctx.getCurrentKey();
  65. String countryName = map.get(countryCode);
  66. String outStr = countryName == null ? "no match" : countryName;
  67. if (outStr.contains("中国")) {
  68. ctx.output(ot, outStr);
  69. }
  70. out.collect(Tuple2.of(countryName.substring(0, countryName.indexOf(" ")), 1));
  71. }
  72. });
  73. SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = connectInput.keyBy(1).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
  74. @Override
  75. public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
  76. return Tuple2.of(value1.f0, value1.f1 + value2.f1);
  77. }
  78. });
  79. reduce.print();
  80. connectInput.getSideOutput(ot).print();
  81. env.execute();
  82. }
  83. }

4.sink

内置数据输出

  1. 基于文件 ```

    使用TextOutputFormat

    stream.writeAsText(“/path/to/file”)

    使用CsvOutputFormat

    stream.writeAsCsv(“/path/to/file”)
  1. 2. 基于Socket
  2. stream.writeToSocket(host, port, SerializationSchema)
  3. 3.
  4. 基于标准/错误输出

stream.writeToSocket(host, port, SerializationSchema)#注: 线上应用杜绝使用,采用抽样打印或者日志的方式 stream.print() stream.printToErr()

  1. 自定义数据输出
  2. - 实现SinkFunction 继承RichSinkFunction(在没有自行改变并行度的情况下,是否并行取决其父operator
  3. 1.实现RichSinkFunction
  4. - 实现写入文件写入文件到HDFS
  5. 示例代码:<br />function

package cn.zhanghub.sink;

import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path;

import java.text.SimpleDateFormat; import java.util.Date;

public class HDFSSinkFunction extends RichSinkFunction {

  1. private FileSystem fs = null;
  2. private SimpleDateFormat sf = null;
  3. private String pathStr = null;
  4. @Override
  5. public void open(Configuration parameters) throws Exception {
  6. org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
  7. fs = FileSystem.get(conf);
  8. sf = new SimpleDateFormat("yyyyMMddHH");
  9. pathStr = "hdfs://ns1/user/qingniu/flinkstreaminghdfs";
  10. }
  11. @Override
  12. public void close() throws Exception {
  13. fs.close();
  14. }
  15. @Override
  16. public void invoke(String value, Context context) throws Exception {
  17. if (null != value) {
  18. String format = sf.format(new Date());
  19. int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
  20. StringBuilder sb = new StringBuilder();
  21. sb.append(pathStr).append("/").append(indexOfThisSubtask).append("_").append(format);
  22. Path path = new Path(sb.toString());
  23. FSDataOutputStream fsd = null;
  24. if (fs.exists(path)) {
  25. fsd = fs.append(path);
  26. } else {
  27. fsd = fs.create(path);
  28. }
  29. fsd.write((value + "\n").getBytes("UTF-8"));
  30. fsd.close();
  31. }
  32. }

}

  1. 运行类:

package cn.zhanghub.sink;

import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class HDFSFile { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource source = env.socketTextStream(“localhost”, 6666); source.addSink(new HDFSSinkFunction()); env.execute(); } }

  1. 2.Kafka-connector-sink<br />![img](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702040043.gif)<br />1.FlinkFlinkKafkaProducer创建方式:

FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema serializationSchema) FlinkKafkaProducer010(String topicId, SerializationSchema serializationSchema, Properties producerConfig) FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema serializationSchema) FlinkKafkaProducer010(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig) FlinkKafkaProducer010(String topicId,SerializationSchema serializationSchema,Properties producerConfig,@Nullable FlinkKafkaPartitioner customPartitioner) FlinkKafkaProducer010(String topicId,KeyedSerializationSchema serializationSchema,Properties producerConfig,@Nullable FlinkKafkaPartitioner customPartitioner)

  1. 2.常见序列化Schema<br />· TypeInformationKeyValueSerializationSchema<br />· SimpleStringSchema<br />4.自定义序列化Schema
  2. - 实现KeyedSerializationSchema接口
  3. **![img](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702040040.gif)**<br />示例代码:

case class KafkaEventP(message: String, eventTime: Long) //自定义Serializer用来进行对象序列化到kafka中 class KafkaEventPKeyedSerializationSchema extends KeyedSerializationSchema[KafkaEventP] { //序列化到kafka的key override def serializeKey(element: KafkaEventP): Array[Byte] = { element.message.getBytes() }

//序列化到kafka的value override def serializeValue(element: KafkaEventP): Array[Byte] = { s”hainiuprocessed${element.message}”.getBytes() }

//得到目标topic可以不指定,因为在创建sink的时候已经指定 override def getTargetTopic(element: KafkaEventP): String = { null } }

  1. 4.producerConfig<br />FlinkKafkaProducer内部KafkaProducer的配置<br />[https://kafka.apache.org/documentation.html ](https:_kafka.apache.org_documentation)<br />示例代码:

Properties producerPropsSns = new Properties(); producerPropsSns.setProperty(“bootstrap.servers”, “s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092”); producerPropsSns.setProperty(“retries”, “3”);

  1. 5.FlinkKafkaPartitioner
  2. - 默认使用FlinkFixedPartitioner,即每个subtask的数据写到同一个Kafka partition
  3. - 自定义分区器:继承FlinkKafkaPartitioner
  4. 示例代码:<br />Partitioner

package cn.zhanghub.sink;

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

public class HainiuFlinkPartitioner extends FlinkKafkaPartitioner { @Override public int partition(Object record, byte[] key, byte[] value, String targetTopic, int[] partitions) { return 1; } }

  1. 运行类:

package cn.zhanghub.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;

import java.util.Properties;

public class KafkaRichParallelSink { public static void main(String[] args) throws Exception {

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStreamSource<String> source = env.socketTextStream("localhost", 6666);
  3. Properties producerPropsSns = new Properties();
  4. producerPropsSns.setProperty("bootstrap.servers", "s1.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  5. producerPropsSns.setProperty("retries", "3");
  6. //FlinkKafkaProducer010类的构造函数支持自定义kafka的partitioner,
  7. FlinkKafkaProducer010 kafkaOut = new FlinkKafkaProducer010<String>("flink_event_result",
  8. new SimpleStringSchema(),
  9. producerPropsSns,new HainiuFlinkPartitioner());
  10. source.addSink(kafkaOut);
  11. env.execute();

} }

  1. ## 二、状态与容错
  2. ## 1.Flink恢复机制
  3. **1.** **通过配置重生策略进行容错**<br />· Flink支持不同的重启策略,这些策略控制在出现故障时如何重新启动job
  4. |
  5. Restart Strategy
  6. | 配置项
  7. | 默认值
  8. | 说明
  9. |
  10. | --- | --- | --- | --- |
  11. |
  12. 固定延迟(Fixed delay
  13. | restart-strategy:fixed-delay
  14. |
  15. | 如果超过最大尝试次数,作业最终会失败。在连续两次重启尝试之间等待固定的时间。
  16. |
  17. |
  18. restart-strategy.fixed-delay.attempts:3
  19. | 1或者Integer.MAX_VALUE(启用checkpoint但未指定重启策略时)
  20. |
  21. |
  22. |
  23. |
  24. restart-strategy.fixed-delay.delay:10s
  25. | akka.ask.timeout或者10s(启用checkpoint但未指定重启策略时)
  26. |
  27. |
  28. |
  29. |
  30. 失败率(Failure rate
  31. | restart-strategy:failure-rate
  32. |
  33. | 在失败后重新启动作业,但是当超过故障率(每个时间间隔的故障)时,作业最终会失败。在连续两次重启尝试之间等待固定的时间。
  34. |
  35. |
  36. restart-strategy:failure-rate.max-failures-per-interval:3
  37. | 1
  38. |
  39. |
  40. |
  41. |
  42. restart-strategy.failure-rate.failure-rateinterval:5min
  43. | 1 minute
  44. |
  45. |
  46. |
  47. |
  48. restart-strategy:failure-rate.delay:10s
  49. | akka.ask.timeout
  50. |
  51. |
  52. |
  53. |
  54. 不恢复(No restart
  55. | restart-strategy:none
  56. |
  57. | 如果没有启用checkpointing,则使用无重启(no restart)策略。
  58. |
  59. · 重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在应用代码中动态指定,会覆盖全局配置<br />固定延迟的代码

env.setRestartStrategy( RestartStrategies.fixedDelayRestart( 3, Time.of(0,TimeUnit.SECONDS) ) )

  1. 示例代码:

package cn.zhanghub.state;

import cn.zhanghub.source.FileCountryDictSourceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;

public class FileSourceRestart {

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. //恢复策略
  4. env.setRestartStrategy(
  5. RestartStrategies.fixedDelayRestart(
  6. 3, // number of restart attempts
  7. Time.of(0, TimeUnit.SECONDS) // delay
  8. )
  9. );
  10. DataStreamSource<String> stringDataStreamSource = env.addSource(new FileCountryDictSourceFunction());
  11. stringDataStreamSource.map(new MapFunction<String, String>() {
  12. @Override
  13. public String map(String value) throws Exception {
  14. System.out.println(value);
  15. if(value.contains("中国")){
  16. int a = 1/0;
  17. }
  18. return value;
  19. }
  20. }).print();
  21. env.execute();
  22. }

}

  1. 通过以上配置可以给你的程序增加生命条数,但是有个问题?能不能不仅增加生命条件,还能帮我存档?通过checkpoint加上state进行数据存档
  2. ## 2.Flink的state
  3. ### 1.什么是状态(State)
  4. · Flink中的状态:一般指一个具体的task/operator某时刻在内存中的状态(例如某属性的值)
  5. ### 2.状态的作用
  6. -
  7. 增量计算
  8. -
  9. - 聚合操作
  10. - 机器学习训练模式
  11. - 等等
  12. -
  13. 容错
  14. 1.
  15. - Job故障重启
  16. - 升级
  17. ### 3.没有状态的日子如何度过
  18. -
  19. Storm+Hbase,把这状态数据存放在Hbase中,计算的时候再次从Hbase读取状态数据,做更新在写入进去。这样就会有如下几个问题:
  20. -
  21. - 流计算任务和Hbase的数据存储有可能不在同一台机器上,导致性能会很差。这样经常会做远端的访问,走网络和存储
  22. - 备份和恢复是比较困难,因为Hbase是没有回滚的,要做到Exactly onces很困难。在分布式环境下,如果程序出现故障,只能重启Storm,那么Hbase的数据也就无法回滚到之前的状态。比如广告计费的这种场景,Storm+Hbase是行不通的,出现的问题是钱可能就会多算,解决以上的办法是Storm+mysql,通过mysql的回滚解决一致性的问题。但是架构会变得非常复杂。性能也会很差,要commit确保数据的一致
  23. - 对于storm而言状态数据的划分和动态扩容也是非常难做的,一个很严重的问题是所有用户都会strom上重复的做这些工作,比如搜索,广告都要在做一遍,由此限制了部门的业务发展
  24. ### 4.Flink有状态的计算
  25. ![img](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702040034.jpg)
  26. ### 5.Flink丰富的状态访问和高效的容错机制
  27. ![img](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702040032.jpg)
  28. ### 6.状态分类
  29. - Operator State
  30. - Keyed State
  31. - 特殊的:Broadcast State1.5开始)
  32. #### 1).Operator State
  33. -
  34. 绑定到特定operator并行实例,每个operator的并行实例维护一个状态
  35. -
  36. key无关
  37. -
  38. 思考:一个并行度为3source有几个状态(只考虑一个算子需要一个逻辑状态的情形)
  39. -
  40. 支持的数据类型
  41. -
  42. - ListState
  43. -
  44. 例子:FlinkKafkaConsumer
  45. -
  46. - 每个Kafka Consumer实例都维护一个topic分区和偏移量的映射作为其操作状态。
  47. ![img](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702040030.jpg)
  48. #### 2).Keyed State
  49. - 基于KeyedStream之上的状态,dataStream.keyBy(),只能在作用于KeyedStrem上的function/Operator里使用
  50. - KeyBy之后的Operator State,可理解为分区过的Operator State
  51. - 每个并行keyed Operator的每个实例的每个key有一个Keyed State:即<parallel-operator-instance,key>就是一个唯一的状态,由于每个key属于一个keyed operator的并行实例,因此我们可以将其简单地理解为<operator,key>
  52. - 思考:一个并行度为2keyed Operator有多少个状态(只考虑一个算子需要一个逻辑状态的情形)
  53. ![img](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702040027.gif)
  54. -
  55. 支持的数据结构
  56. -
  57. -
  58. ValueState:保留一个可以更新和检索的值
  59. -
  60. - update(T)
  61. - value()
  62. -
  63. ListState:保存一个元素列表
  64. -
  65. - add(T)
  66. - addAll(List)
  67. - get(T)
  68. - clear()
  69. -
  70. ReducingState:保存一个值,该值表示添加到该状态所有值的聚合。
  71. -
  72. - add(T)
  73. -
  74. AggregatingState<IN,OUT>:保存一个值,该值表示添加到该状态的所有值的聚合。(与ReducingState相反,聚合类型添加到该状态的元素可以有不同类型)
  75. -
  76. - add(T)
  77. -
  78. FoldingState<T,ACC>:不推荐使用
  79. -
  80. - add(T)
  81. -
  82. MapState<UK,UV>:保存一个映射列表
  83. -
  84. - put(UK,UV)
  85. - putAll(Map<UK,UV>)
  86. - get(UK)
  87. ![img](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702040024.jpg)
  88. #### 3).注意:
  89. - 状态不一定存储在内存,可能驻留在磁盘或其他地方
  90. - 状态是使用RuntimeContext访问的,因此只能在Rich函数或process函数中访问
  91. #### 4).状态的表现形式
  92. -
  93. Keyed StateOperator State,可以以两种形式存在:原始状态和托管状态。
  94. -
  95. managed(托管状态):
  96. -
  97. - 托管状态是指Flink框架管理的状态,如ValueStateListStateMapState等。
  98. - 通过框架提供的接口来更新和管理状态的值
  99. - 不需要序列化
  100. -
  101. raw(原始状态)
  102. -
  103. - 原始状态是由用户自行管理的具体的数据结构,Flink在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知
  104. - 需要序列化
  105. -
  106. 通常在DataStream上的状态推荐使用托管的状态,当用户自定义operator时,会使用到原始状态。
  107. -
  108. 大多数都是托管状态,除非自定义实现。
  109. ## 3.Flink的checkpoint
  110. ### 1).状态容错
  111. - 有了状态自然需要状态容错,否则状态就失去意义了
  112. - Flink状态容错的机制就是checkpoint
  113. ### 2).状态容错示意图
  114. ![img](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702040021.jpg)
  115. ### 3).状态容错示意图(checkpoint)
  116. ![img](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702040019.jpg)
  117. ### 4).状态容错示意图(Restore)
  118. - 恢复所有状态
  119. - 设置source的位置(例如:Kafkaoffset
  120. ![img](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702040017.jpg)
  121. ### 5).Checkpointing是什么
  122. -
  123. 概念
  124. -
  125. - 所谓checkpoint,就是在某一时刻,将所有task的状态做一个快照(snapshot),然后存储到State Backend
  126. - 一种连续性绘制数据流状态的机制(周期性的),该机制确保即使出现故障,程序的状态最终也将为数据流中的每一条记录提供exactly once的语意保证(只能保证flink系统内,对于sinksource需要依赖的外部的组件一同保证)
  127. - 全局快照,持久化保存所有的task / operatorState
  128. - 序列化数据集合
  129. - 注意:可以通过开关使用at least once语意保证
  130. - 注意:Checkpoint是通过分布式snapshot实现的,没有特殊说明时snapshotcheckpointback-up是一个意思
  131. - 注意:StateCheckpointing不要搞混
  132. -
  133. 特点:
  134. -
  135. - 轻量级容错机制
  136. - 可异步
  137. - 全量 vs 增量
  138. - 失败情况可回滚至最近一次成功的checkpoint(自动)
  139. - 周期性(无需人工干预)
  140. ## 4.Checkpointing与State的使用
  141. **启用Checkpointing**
  142. ### 1).如何开启Checkpointing
  143. - Checkpointing默认是禁用的
  144. - 注意:迭代job目前不支持Checkpoint

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//start a checkpoint every 1000 ms env.enableCheckpointing(1000);

//advanced options:

//set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

//make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

//checkpoints have to complete within one minute,or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000);

//allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

//enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

env.getCheckpointConfig().setFailOnCheckpointingErrors(true);

  1. ### 2).Checkpointing高级选项之checkpointMode
  2. · CheckpointingMode.EXACTLY_ONCE<br />· CheckpointingMode.AT_LEAST_ONCE<br />· 如何选择:一般情况下选择EXACTLY_ONCE,除非场景要求极低的延迟(几毫秒)<br />· 注意:要想整个EXACTLY_ONCEsourcesink也要同时保证EXACTLY_ONCE<br />//set mode to exactly-once (this is the default) env**.**getCheckpointConfig**().**setCheckpointingMode**(**CheckpointingMode**.**EXACTLY_ONCE**);**
  3. ### 3).Checkpointing高级选项之保留策略
  4. -
  5. 默认情况下,检查点不被保留,仅用于从故障中恢复作业。可以启用外部持久化检查点,同时指定保留策略
  6. -
  7. - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点。注意,在这种情况系,必须在取消后手动清理检查点状态。
  8. - ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION当作业被cancel时,删除检查点。检查点状态仅在作业失败时可用。

//enable externalized checkpoints which are retained after cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

  1. ### 4).Checkpointing其他高级选项
  2. -
  3. checkpointing的超时时间:超过时间没有完成则会被终止
  4. <br />//checkpoints have to complete within one minute, or are discarded
  5. env**.**getCheckpointConfig**().**setCheckpointTimeout**(**60000**);**
  6. -
  7. checkpointing最小间隔:用于指定上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
  8. <br />//make sure 500 ms of progress happen between checkpoints
  9. env**.**getCheckpointConfig**().**setMinPauseBetweenCheckpoints**(**500**);**
  10. -
  11. maxConcurrentCheckpoints:指定运行中的checkpoint最多可以有多少个(设定checkpointing最小间隔时本参数即为1
  12. <br />//allow only one checkpoint to be in progress at the same time
  13. env**.**getCheckpointConfig**().**setMaxConcurrentCheckpoints**(**1**);**
  14. - failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该failtask,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行
  15. env**.**getCheckpointConfig**().**setFailOnCheckpointingErrors**(true);**<br />注意,当开启checkpointing对重启(no restart)策略的影响:
  16. - 如果没有启用checkpointing,就是不恢复数据。
  17. - 如果启用了checkpointing,但没有配置重启策略,则使用固定延迟(fixed-delay)策略,其中尝试重启次数是Integer>MAX_VALUE
  18. #### 1.使用Operator State方式1:实现CheckpointedFunction
  19. -
  20. Stateful functionRichFunction)实现CheckpointedFunction接口,必须实现两个方法:
  21. -
  22. - Void snapshotState(FunctionSnapshotContext context) throws Exception
  23. Checkpoint执行时调用<br /> 一般用于原始状态与托管状态进行交换
  24. -
  25. - Void initializeState(FunctionlnitializationContext context) throws Exception;(初始化以及恢复逻辑)
  26. Stateful function第一次初始化时调用<br />
  27. Stateful function从较早的checkpoint恢复时调用<br />示例代码:<br />checkpointFunction

package cn.zhanghub.state;

import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path;

import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.List;

public class FileCountryDictSourceOperatorStateCheckpointedFunction implements SourceFunction,CheckpointedFunction {

  1. private String md5 = null;
  2. private ListState<String> ls = null;
  3. private Boolean isCancel = true;
  4. private Integer interval = 1000;
  5. @Override
  6. public void run(SourceContext<String> ctx) throws Exception {
  7. Path pathString = new Path("hdfs://ns1/user/qingniu/country_data");
  8. Configuration hadoopConf = new Configuration();
  9. FileSystem fs = FileSystem.get(hadoopConf);
  10. while (isCancel) {
  11. if(!fs.exists(pathString)){
  12. Thread.sleep(interval);
  13. continue;
  14. }
  15. System.out.println(md5);
  16. FileChecksum fileChecksum = fs.getFileChecksum(pathString);
  17. String md5Str = fileChecksum.toString();
  18. String currentMd5 = md5Str.substring(md5Str.indexOf(":") + 1);
  19. if (!currentMd5.equals(md5)) {
  20. FSDataInputStream open = fs.open(pathString);
  21. BufferedReader reader = new BufferedReader(new InputStreamReader(open));
  22. String line = reader.readLine();
  23. while (line != null) {
  24. ctx.collect(line);
  25. line = reader.readLine();
  26. }
  27. reader.close();
  28. md5 = currentMd5;
  29. }
  30. Thread.sleep(interval);
  31. }
  32. }
  33. @Override
  34. public void cancel() {
  35. isCancel = false;
  36. }
  37. @Override
  38. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  39. ls.clear();
  40. ls.add(md5);
  41. System.out.println("snapshotState");
  42. }
  43. @Override
  44. public void initializeState(FunctionInitializationContext context) throws Exception {
  45. ListStateDescriptor<String> lsd = new ListStateDescriptor<String>("md5",String.class);
  46. ls = context.getOperatorStateStore().getListState(lsd);
  47. if (context.isRestored()){
  48. Iterable<String> strings = ls.get();
  49. String next = strings.iterator().next();
  50. md5 = next;
  51. }
  52. }

}

  1. 运行程序:

package cn.zhanghub.state;

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;

public class FileSourceOperatorStateCheckpointed {

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.enableCheckpointing(1000);
  4. CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  5. //保存EXACTLY_ONCE
  6. checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  7. //每次ck之间的间隔,不会重叠
  8. checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
  9. //每次ck的超时时间
  10. checkpointConfig.setCheckpointTimeout(20000L);
  11. //如果ck执行失败,程序是否停止
  12. checkpointConfig.setFailOnCheckpointingErrors(true);
  13. //job在执行CANCE的时候是否删除ck数据
  14. checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  15. //恢复策略
  16. env.setRestartStrategy(
  17. RestartStrategies.fixedDelayRestart(
  18. 3, // number of restart attempts
  19. Time.of(0, TimeUnit.SECONDS) // delay
  20. )
  21. );
  22. DataStreamSource<String> stringDataStreamSource = env.addSource(new FileCountryDictSourceOperatorStateCheckpointedFunction());
  23. stringDataStreamSource.map(new MapFunction<String, String>() {
  24. @Override
  25. public String map(String value) throws Exception {
  26. if(value.contains("中国")){
  27. int a = 1/0;
  28. }
  29. return value;
  30. }
  31. }).print();
  32. env.execute();
  33. }

}

  1. 使用Operator State方式
  2. #### 2:实现ListCheckpointed
  3. (这个接口自己本身就带了一个ListState
  4. -
  5. Stateful functionRichFunction)实现ListCheckpointed接口,只用ListState的重分配方式
  6. -
  7. 必须实现两个方法
  8. -
  9. - ListsnapshotState(long checkpointld,long timestamp) throws Exception;
  10. Checkpoint执行时调用<br /> 这个方法的返回值,会被当成一个listState util.List->listState
  11. -
  12. - void restoreState(Liststate) throws Exception;
  13. 这个方法的传入参数,实际上snapshotState返回的listState -> util.List,所以在这个方法面能直接得到listState恢复的数据。<br /> Stateful function从较早的checkpoint恢复时调用<br />示例代码:<br />ListCheckpointed

package cn.zhanghub.state;

import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path;

import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List;

public class FileCountryDictSourceOperatorStateListCheckpointedFunction implements SourceFunction, ListCheckpointed {

  1. private String md5 = null;
  2. private Boolean isCancel = true;
  3. private Integer interval = 1000;
  4. @Override
  5. public void run(SourceContext<String> ctx) throws Exception {
  6. Path pathString = new Path("hdfs://ns1/user/qingniu/country_data");
  7. Configuration hadoopConf = new Configuration();
  8. FileSystem fs = FileSystem.get(hadoopConf);
  9. while (isCancel) {
  10. if (!fs.exists(pathString)) {
  11. Thread.sleep(interval);
  12. continue;
  13. }
  14. System.out.println(md5);
  15. FileChecksum fileChecksum = fs.getFileChecksum(pathString);
  16. String md5Str = fileChecksum.toString();
  17. String currentMd5 = md5Str.substring(md5Str.indexOf(":") + 1);
  18. if (!currentMd5.equals(md5)) {
  19. FSDataInputStream open = fs.open(pathString);
  20. BufferedReader reader = new BufferedReader(new InputStreamReader(open));
  21. String line = reader.readLine();
  22. while (line != null) {
  23. ctx.collect(line);
  24. line = reader.readLine();
  25. }
  26. reader.close();
  27. md5 = currentMd5;
  28. }
  29. Thread.sleep(interval);
  30. }
  31. }
  32. @Override
  33. public void cancel() {
  34. isCancel = false;
  35. }
  36. @Override
  37. public List<String> snapshotState(long checkpointId, long timestamp) throws Exception {
  38. List<String> list = new ArrayList<>();
  39. list.add(md5);
  40. System.out.println("snapshotState");
  41. return list;
  42. }
  43. @Override
  44. public void restoreState(List<String> state) throws Exception {
  45. md5 = state.get(0);
  46. }

}

  1. 运行程序:

package cn.zhanghub.state;

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;

public class FileSourceOperatorStateListCheckpointed {

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.enableCheckpointing(1000);
  4. CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  5. //保存EXACTLY_ONCE
  6. checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  7. //每次ck之间的间隔,不会重叠
  8. checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
  9. //每次ck的超时时间
  10. checkpointConfig.setCheckpointTimeout(20000L);
  11. //如果ck执行失败,程序是否停止
  12. checkpointConfig.setFailOnCheckpointingErrors(true);
  13. //job在执行CANCE的时候是否删除ck数据
  14. checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// //恢复策略 env.setRestartStrategy( RestartStrategies.fixedDelayRestart( 3, // number of restart attempts Time.of(0, TimeUnit.SECONDS) // delay ) );

  1. DataStreamSource<String> stringDataStreamSource = env.addSource(new FileCountryDictSourceOperatorStateListCheckpointedFunction());
  2. stringDataStreamSource.map(new MapFunction<String, String>() {
  3. @Override
  4. public String map(String value) throws Exception {
  5. if(value.contains("中国")){
  6. int a = 1/0;
  7. }
  8. return value;
  9. }
  10. }).print();
  11. env.execute();
  12. }

}

  1. **2.** **使用** **KeyedState ** **:**
  2. ##### 1.Keyed State之过期超时策略
  3. -
  4. 由于Keyed State太多,所以flink提供了针对Keyed State TTL的设置
  5. -
  6. 任何类型的keyed State都可以设置TTL。如果TTL已配置,且状态已过期,则将以最佳方式处理
  7. -
  8. 所有State collection都支持条目级别的TTL,即listmap中的条目独立expire
  9. -
  10. 用法:

StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>(“text state”, String.class); stateDescriptor.enableTimeToLive(ttlConfig);

  1. -
  2. Refresh策略(默认是OnCreateAndWrite):设置如何更新keyedState的最后访问时间
  3. -
  4. -
  5. StateTtlConfig.UpdateType.Disabled - 禁用TTL,永不过期
  6. -
  7. StateTtlConfig.UpdateType.OnCreateAndWrite - 每次写操作均更新State的最后访问时间(CreateUpdate)
  8. -
  9. StateTtlConfig.UpdateType.OnReadAndWrite - 每次读写操作均更新State的最后访问时间
  10. -
  11. 状态可见性(默认是NeverReturnExpired):设置是否返回过期的值(过期尚未清理,此时正好被访问)
  12. -
  13. - StateTtlConfig.StateVisibility.NeverReturnExpired - 永不返回过期状态
  14. - StateTtlConfig.StateVisibility.ReturnExpiredlfNotCleanedUp - 可以返回过期但尚未清理的状态值
  15. -
  16. TTL time等级
  17. -
  18. - setTimeCharacteristic(TimeCharacteristic timeCharacteristic)
  19. - 目前只支持ProcessingTime
  20. ##### 2.Keyed State之过期状态清理
  21. -
  22. 清理策略
  23. -
  24. -
  25. 默认:已经过期的数据被显示读取时才会清理(可能会导致状态越来越大,后续版本会改进)
  26. -
  27. FULL_STATE_SCAN_SNAPSHOT:在checkpoint时清理full snapshot中的expired state
  28. -
  29. - CleanupFullSnapshot()
  30. - 不适用于在RocksDB state backend上的incremental checkpointing
  31. ##### 3.Keyed State TTL的注意事项
  32. - 启用TTL增加后端状态存储的消耗
  33. - 原来没启用TTL,后来启用TTL做恢复会将导致兼容性失败和StatmigrationException(反之也一样)
  34. - TTL配置不是检查或保存点的一部分
  35. 示例代码:

package cn.zhanghub.state;

import cn.zhanghub.source.FileCountryDictSourceFunction; import cn.zhanghub.source.HainiuKafkaRecord; import cn.zhanghub.source.HainiuKafkaRecordSchema; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.util.Collector;

import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit;

public class CountryCodeConnectKeyByKeyedState {

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.enableCheckpointing(1000);
  4. CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  5. //保存EXACTLY_ONCE
  6. checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  7. //每次ck之间的间隔,不会重叠
  8. checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
  9. //每次ck的超时时间
  10. checkpointConfig.setCheckpointTimeout(20000L);
  11. //如果ck执行失败,程序是否停止
  12. checkpointConfig.setFailOnCheckpointingErrors(true);
  13. //job在执行CANCE的时候是否删除ck数据
  14. checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  15. //恢复策略
  16. env.setRestartStrategy(
  17. RestartStrategies.fixedDelayRestart(
  18. 3, // number of restart attempts
  19. Time.of(0, TimeUnit.SECONDS) // delay
  20. )
  21. );
  22. DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
  23. Properties kafkaConsumerProps = new Properties();
  24. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  25. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  26. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  27. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  28. // kafkaSource.setStartFromEarliest()
  29. // kafkaSource.setStartFromGroupOffsets()
  30. kafkaSource.setStartFromLatest();
  31. DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
  32. KeyedStream<Tuple2<String, String>, String> countryDictKeyBy = countryDictSource.map(new MapFunction<String, Tuple2<String, String>>() {
  33. @Override
  34. public Tuple2<String, String> map(String value) throws Exception {
  35. String[] split = value.split("\t");
  36. return Tuple2.of(split[0], split[1]);
  37. }
  38. }).keyBy(new KeySelector<Tuple2<String, String>, String>() {
  39. @Override
  40. public String getKey(Tuple2<String, String> value) throws Exception {
  41. return value.f0;
  42. }
  43. });
  44. KeyedStream<HainiuKafkaRecord, String> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, String>() {
  45. @Override
  46. public String getKey(HainiuKafkaRecord value) throws Exception {
  47. return value.getRecord();
  48. }
  49. });
  50. ConnectedStreams<Tuple2<String, String>, HainiuKafkaRecord> connect = countryDictKeyBy.connect(record);
  51. SingleOutputStreamOperator<String> connectInput = connect.process(new KeyedCoProcessFunction<String, Tuple2<String, String>, HainiuKafkaRecord, String>() {
  52. private MapState<String,String> map = null;
  53. @Override
  54. public void open(Configuration parameters) throws Exception {
  55. //keyState的TTL策略
  56. StateTtlConfig ttlConfig = StateTtlConfig
  57. //keyState的超时时间为100秒
  58. .newBuilder(Time.seconds(100))
  59. //当创建和更新时,重新计时超时时间
  60. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  61. //失败时不返回keyState的值
  62. //.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  63. //失败时返回keyState的值
  64. .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
  65. //ttl的时间处理等级目前只支持ProcessingTime
  66. .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
  67. .build();
  68. //从runtimeContext中获得ck时保存的状态
  69. MapStateDescriptor<String,String> msd = new MapStateDescriptor<String, String>("map",String.class,String.class);
  70. msd.enableTimeToLive(ttlConfig);
  71. map = getRuntimeContext().getMapState(msd);
  72. }
  73. @Override
  74. public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
  75. map.put(ctx.getCurrentKey(), value.f1);
  76. out.collect(value.toString());
  77. }
  78. @Override
  79. public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
  80. for(Map.Entry<String,String> m:map.entries()){
  81. System.out.println(m.getKey());
  82. System.out.println(m.getValue());
  83. }
  84. if(value.getRecord().equals("CN")){
  85. int a = 1/0;
  86. }
  87. String countryCode = ctx.getCurrentKey();
  88. String countryName = map.get(countryCode);
  89. String outStr = countryName == null ? "no match" : countryName;
  90. out.collect(outStr);
  91. }
  92. });
  93. connectInput.print();
  94. env.execute();
  95. }

}

  1. **3.使用BroadcastState:**<br />**之前的程序是使用DistributekeyBy)的方式让数据进行shuffle完成数据的join的,那shuffle可能会带来数据倾斜的问题,那怎么能不shuffle完成数据的join呢?使用广播状态,相当于spark的广播变量的作用。**<br />1).为特殊场景而生
  2. -
  3. 特殊场景:来自一个流的一些数据需要广播到所有下游任务,在这些任务中,这些数据被本地存储并用于处理另一个流上的所有传入元素。例如:一个低吞吐量流,其中包含一组规则,我们希望对来自另一个流的所有元素按规则进行计算。
  4. -
  5. 典型应用:
  6. -
  7. - 常规事件流.connect(事件流)
  8. - 常规配置流.connect(配置流)
  9. 2).Broadcast State使用套路(三步)
  10. - 创建常规事件流DataStream / KeyedDataStream
  11. - 创建BroadcastedStream:创建规则流 / 配置流(低吞吐)并广播
  12. - 连接两个Stream,生成BroadcastConnectedStream并实现计算处理
  13. - proccess(BroadcastProcessFunction and KeyedBroadcastProcessFunction)
  14. 3).BroadcastProcessFunction

public abstract class BroadcastProcessFunction extends BaseBroadcastProcessFunction { public abstract void processElement(IN1 value,ReadOnlyContext ctx,Collector out) throws Exception; public abstract void processBroadcastElement(IN2 value,Context ctx,Collector out) throws Exception; }

  1. -
  2. processElement(...):负责处理非广播流中的传入元素,他可以使用与广播状态进行匹配
  3. -
  4. processBroadcastElement(...):负责处理广播流中的传入元素(例如规则),一般把广播流的元素添加到状态(MapState)里去备用,processElement处理业务数据时就可以使用(规则)
  5. -
  6. ReadOnlyContextContext的不同
  7. -
  8. - ReadOnlyContextBroadcast State有只读权限
  9. - Context有读写权限
  10. 首先来个badCase<br />示例代码:<br />发射map类型的sourceFunction

package cn.zhanghub.source;

import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path;

import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map;

public class FileCountryDictSourceMapFunction implements SourceFunction> {

  1. private String md5 = null;
  2. private Boolean isCancel = true;
  3. private Integer interval = 10000;
  4. @Override
  5. public void run(SourceContext<Map<String,String>> ctx) throws Exception {
  6. Path pathString = new Path("hdfs://ns1/user/qingniu/country_data");
  7. Configuration hadoopConf = new Configuration();
  8. FileSystem fs = FileSystem.get(hadoopConf);
  9. while (isCancel) {
  10. if(!fs.exists(pathString)){
  11. Thread.sleep(interval);
  12. continue;
  13. }
  14. FileChecksum fileChecksum = fs.getFileChecksum(pathString);
  15. String md5Str = fileChecksum.toString();
  16. String currentMd5 = md5Str.substring(md5Str.indexOf(":") + 1);
  17. if (!currentMd5.equals(md5)) {
  18. FSDataInputStream open = fs.open(pathString);
  19. BufferedReader reader = new BufferedReader(new InputStreamReader(open));
  20. String line = reader.readLine();
  21. Map<String,String> map = new HashMap<>();
  22. while (line != null) {
  23. String[] split = line.split("\t");
  24. map.put(split[0],split[1]);
  25. line = reader.readLine();
  26. }
  27. ctx.collect(map);
  28. reader.close();
  29. md5 = currentMd5;
  30. }
  31. Thread.sleep(interval);
  32. }
  33. }
  34. @Override
  35. public void cancel() {
  36. isCancel = false;
  37. }

}

  1. 运行类:

package cn.zhanghub.operator;

import cn.zhanghub.source.FileCountryDictSourceMapFunction; import cn.zhanghub.source.HainiuKafkaRecord; import cn.zhanghub.source.HainiuKafkaRecordSchema; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.util.Collector;

import java.util.HashMap; import java.util.Map; import java.util.Properties;

public class CountryCodeConnectMap {

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. //必须设置不然匹配不上
  4. env.setParallelism(1);
  5. DataStreamSource<Map<String, String>> countryDictSource = env.addSource(new FileCountryDictSourceMapFunction());
  6. Properties kafkaConsumerProps = new Properties();
  7. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  8. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  9. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  10. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  11. // kafkaSource.setStartFromEarliest()
  12. // kafkaSource.setStartFromGroupOffsets()
  13. kafkaSource.setStartFromLatest();
  14. DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
  15. ConnectedStreams<Map<String, String>, HainiuKafkaRecord> connect = countryDictSource.connect(kafkainput);
  16. SingleOutputStreamOperator<String> connectInput = connect.process(new CoProcessFunction<Map<String, String>, HainiuKafkaRecord, String>() {
  17. private Map<String, String> map = new HashMap<String, String>();
  18. @Override
  19. public void processElement1(Map<String, String> value, Context ctx, Collector<String> out) throws Exception {
  20. for (Map.Entry<String, String> entry : value.entrySet()) {
  21. map.put(entry.getKey(), entry.getValue());
  22. }
  23. out.collect(value.toString());
  24. }
  25. @Override
  26. public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
  27. String countryCode = value.getRecord();
  28. String countryName = map.get(countryCode);
  29. String outStr = countryName == null ? "no match" : countryName;
  30. out.collect(outStr);
  31. }
  32. });
  33. connectInput.print();
  34. env.execute();
  35. }

}

  1. 使用广播状态进行优化:

public abstract class BroadcastProcessFunction extends BaseBroadcastProcessFunction { public abstract void processElement(IN1 value,ReadOnlyContext ctx,Collector out) throws Exception; public abstract void processBroadcastElement(IN2 value,Context ctx,Collector out) throws Exception; }

  1. -
  2. processElement(...):负责处理非广播流中的传入元素,他可以使用与广播状态进行匹配
  3. -
  4. processBroadcastElement(...):负责处理广播流中的传入元素(例如规则),一般把广播流的元素添加到状态(MapState)里去备用,processElement处理业务数据时就可以使用(规则)
  5. -
  6. ReadOnlyContextContext的不同
  7. -
  8. - ReadOnlyContextBroadcast State有只读权限
  9. - Context有读写权限
  10. 示例代码:

package cn.zhanghub.state;

import cn.zhanghub.source.FileCountryDictSourceMapFunction; import cn.zhanghub.source.HainiuKafkaRecord; import cn.zhanghub.source.HainiuKafkaRecordSchema; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.util.Collector;

import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit;

public class CountryCodeConnectMapBroadCast {

  1. private static final MapStateDescriptor<String, String> msd = new MapStateDescriptor<>("countryCodeMap", String.class, String.class);
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.enableCheckpointing(1000);
  5. CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  6. //保存EXACTLY_ONCE
  7. checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  8. //每次ck之间的间隔,不会重叠
  9. checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
  10. //每次ck的超时时间
  11. checkpointConfig.setCheckpointTimeout(20000L);
  12. //如果ck执行失败,程序是否停止
  13. checkpointConfig.setFailOnCheckpointingErrors(true);
  14. //job在执行CANCE的时候是否删除ck数据
  15. checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  16. //指定保存ck的存储模式,这个是默认的
  17. MemoryStateBackend stateBackend = new MemoryStateBackend(10 * 1024 * 1024, false);
  18. //指定保存ck的存储模式

// FsStateBackend stateBackend = new FsStateBackend(“file:///Users/leohe/Data/output/flink/checkpoints”, true);

// RocksDBStateBackend stateBackend = new RocksDBStateBackend(“file:///Users/leohe/Data/output/flink/checkpoints”, true); env.setStateBackend(stateBackend);

  1. //恢复策略
  2. env.setRestartStrategy(
  3. RestartStrategies.fixedDelayRestart(
  4. 3, // number of restart attempts
  5. Time.of(0, TimeUnit.SECONDS) // delay
  6. )
  7. );
  8. DataStreamSource<Map<String, String>> countryDictSource = env.addSource(new FileCountryDictSourceMapFunction());
  9. Properties kafkaConsumerProps = new Properties();
  10. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  11. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  12. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  13. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  14. // kafkaSource.setStartFromEarliest()
  15. // kafkaSource.setStartFromGroupOffsets()
  16. kafkaSource.setStartFromLatest();
  17. DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
  18. BroadcastStream<Map<String, String>> broadcastinput = countryDictSource.broadcast(msd);
  19. BroadcastConnectedStream<HainiuKafkaRecord, Map<String, String>> broadcastConnect = kafkainput.connect(broadcastinput);
  20. SingleOutputStreamOperator<String> broadcastConnectInput = broadcastConnect.process(new BroadcastProcessFunction<HainiuKafkaRecord, Map<String, String>, String>() {
  21. //private Map<String, String> map = new HashMap<String, String>();
  22. @Override
  23. public void processElement(HainiuKafkaRecord value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
  24. String countryCode = value.getRecord();
  25. ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(msd);
  26. String countryName = broadcastState.get(countryCode);

// String countryName = map.get(countryCode); String outStr = countryName == null ? “no match” : countryName; out.collect(outStr); }

  1. @Override
  2. public void processBroadcastElement(Map<String, String> value, Context ctx, Collector<String> out) throws Exception {
  3. BroadcastState<String, String> broadcastState = ctx.getBroadcastState(msd);
  4. for (Map.Entry<String, String> entry : value.entrySet()) {
  5. broadcastState.put(entry.getKey(), entry.getValue());
  6. }

// for (Map.Entry entry : value.entrySet()) { // map.put(entry.getKey(), entry.getValue()); // } out.collect(value.toString()); } });

  1. broadcastConnectInput.print();
  2. env.execute();
  3. }

}

  1. 广播状态的别一种使用方法,keyBy之后的:
  2. ##### 4. KeyedBroadcastProcessFunction
  3. -
  4. processElement(...):负责处理非广播流中的传入元素
  5. -
  6. processBroadcastElement(...):负责处理广播流中的传入元素(例如规则),一般把广播流的元素添加到状态里去备用,processElement处理业务数据时就可以使用(规则)
  7. -
  8. ReadOnlyContextContext的不同
  9. -
  10. - ReadOnlyContextBroadcast State有只读权限
  11. - Context有读写权限
  12. 示例代码:

package cn.zhanghub.state;

import cn.zhanghub.source.FileCountryDictSourceMapFunction; import cn.zhanghub.source.HainiuKafkaRecord; import cn.zhanghub.source.HainiuKafkaRecordSchema; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.util.Collector;

import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit;

public class CountryCodeConnectMapKeyedBroadCast {

  1. private static final MapStateDescriptor<String, String> msd = new MapStateDescriptor<>("countryCodeMap", String.class, String.class);
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.enableCheckpointing(1000);
  5. CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  6. //保存EXACTLY_ONCE
  7. checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  8. //每次ck之间的间隔,不会重叠
  9. checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
  10. //每次ck的超时时间
  11. checkpointConfig.setCheckpointTimeout(20000L);
  12. //如果ck执行失败,程序是否停止
  13. checkpointConfig.setFailOnCheckpointingErrors(true);
  14. //job在执行CANCE的时候是否删除ck数据
  15. checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  16. //指定保存ck的存储模式,这个是默认的
  17. MemoryStateBackend stateBackend = new MemoryStateBackend(10 * 1024 * 1024, false);
  18. //指定保存ck的存储模式

// FsStateBackend stateBackend = new FsStateBackend(“file:///Users/leohe/Data/output/flink/checkpoints”, true);

// RocksDBStateBackend stateBackend = new RocksDBStateBackend(“file:///Users/leohe/Data/output/flink/checkpoints”, true); env.setStateBackend(stateBackend);

  1. //恢复策略
  2. env.setRestartStrategy(
  3. RestartStrategies.fixedDelayRestart(
  4. 3, // number of restart attempts
  5. Time.of(0, TimeUnit.SECONDS) // delay
  6. )
  7. );
  8. DataStreamSource<Map<String, String>> countryDictSource = env.addSource(new FileCountryDictSourceMapFunction());
  9. Properties kafkaConsumerProps = new Properties();
  10. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  11. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  12. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  13. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  14. // kafkaSource.setStartFromEarliest()
  15. // kafkaSource.setStartFromGroupOffsets()
  16. kafkaSource.setStartFromLatest();
  17. DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
  18. KeyedStream<HainiuKafkaRecord, String> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, String>() {
  19. @Override
  20. public String getKey(HainiuKafkaRecord value) throws Exception {
  21. return value.getRecord();
  22. }
  23. });
  24. BroadcastStream<Map<String, String>> broadcastinput = countryDictSource.broadcast(msd);
  25. BroadcastConnectedStream<HainiuKafkaRecord, Map<String, String>> broadcastConnect = record.connect(broadcastinput);
  26. SingleOutputStreamOperator<String> broadcastConnectInput = broadcastConnect.process(new KeyedBroadcastProcessFunction<String, HainiuKafkaRecord, Map<String, String>, String>(){
  27. //private Map<String, String> map = new HashMap<String, String>();
  28. @Override
  29. public void processElement(HainiuKafkaRecord value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
  30. String countryCode = ctx.getCurrentKey();
  31. ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(msd);
  32. String countryName = broadcastState.get(countryCode);

// String countryName = map.get(countryCode); String outStr = countryName == null ? “no match” : countryName; out.collect(outStr); }

  1. @Override
  2. public void processBroadcastElement(Map<String, String> value, Context ctx, Collector<String> out) throws Exception {
  3. BroadcastState<String, String> broadcastState = ctx.getBroadcastState(msd);
  4. for (Map.Entry<String, String> entry : value.entrySet()) {
  5. broadcastState.put(entry.getKey(), entry.getValue());
  6. }

// for (Map.Entry entry : value.entrySet()) { // map.put(entry.getKey(), entry.getValue()); // } out.collect(value.toString()); } });

  1. broadcastConnectInput.print();
  2. env.execute();
  3. }

}

  1. ```
  2. public class MyReduceFunctionWithKeyedState extends RichReduceFunction<Tuple2<String, Integer>> {
  3. private ValueState<Integer> count;
  4. @Override
  5. public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
  6. count.update(t1.f1 + t2.f1);
  7. return Tuple2.of(t1.f0, count.value());
  8. }
  9. @Override
  10. public void open(Configuration parameters) throws Exception {
  11. final ValueStateDescriptor<Integer> cs = new ValueStateDescriptor<>("cs", Integer.class);
  12. /** 超时时间设置*/
  13. StateTtlConfig ttlConfig = StateTtlConfig
  14. .newBuilder(Time.seconds(1))
  15. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  16. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  17. .build();
  18. cs.enableTimeToLive(ttlConfig);
  19. count = getRuntimeContext().getState(cs);
  20. }
  21. }

注意事项

  • 每个任务的广播状态的元素顺序有可能不一样
  • Broadcast State 保存在内存中(并不在 RocksDB

5.CheckPoint 原理(面试经常问)

  • 通过往source 注入barrier
  • barrier作为checkpoint的标志

    1.Barrier
  • 全局异步化是snapshot的核心机制

  • Flink分布式快照的核心概念之一就是数据栅栏(barrier)。这些barrier被插入到数据流中,作为数据流的一部分和数据一起向下流动。Barrier不会干扰正常数据,数据严格有序。一个barrier把数据流分割成两部分:一部分进入到当前快照,另一部分进入下一个快照。每一个barrier都带有快照ID,并且barrier之前的数据都进入了此快照。Barrier不会干扰数据流处理,所以非常轻量。多个不同快照的多个barrier会在流中同时出现,即多个快照可能同时创建。

img

  • Barrier在数据源端插入,当snapshot n的barrier插入后,系统会记录当前snapshot位置值 n(用Sn表示)。
  • 例如,在Apache Kafka中,这个变量表示某个分区中最后一条数据的偏移量。这个位置值 Sn 会被发送到一个称为checkpoint cordinator的模块。(即Flink 的 JobManager)

2. 分布式环境下的 ck 原理:
img
在分布式情况下:

  • operator在收到所有输入数据流中的barrier之后,在发射barrier到其输出流之前对其状态进行快照。此时,在barrier之前的数据对状态的更新已经完成,不会再依赖barrier之前数据。
  • 然后它会向其所有输出流插入一个标识 snapshot n 的barrier。当sink operator (DAG流的终点)从其输入流接收到所有barrier n时,它向checkpoint coordinator 确认 snapshot n 已完成当,所有sink 都确认了这个快照,代表快照在分布式的情况被标识为整体完成。

由于快照可能非常大,所以后端存储系统可配置。默认是存储到JobManager的内存中,但是对于生产系统,需要配置成一个可靠的分布式存储系统(例如HDFS)。状态存储完成后,operator会确认其checkpoint完成,发射出barrier到后续输出流。
快照现在包含了:

  • 对于并行输入数据源:快照创建时数据流中的位置偏移
  • 对于operator:存储在快照中的状态指针

3.Barrier 多并行度(对齐), flink 怎么保证 Exactly Once
img
接收超过一个输入流的operator需要基于barrier对齐(align)输入。参见上图:

  • operator 只要一接收到某个输入流的barrier n,它就不能继续处理此数据流后续的数据,直到operator接收到其余流的barrier n。否则会将属于snapshot n 的数据和snapshot n+1的搞混
  • barrier n 所属的数据流先不处理,从这些数据流中接收到数据被放入接收缓存里(input buffer)
  • 当从最后一个流中提取到barrier n 时,operator 会发射出所有等待向后发送的数据,然后发射snapshot n 所属的barrier
  • 经过以上步骤,operator 恢复所有输入流数据的处理,优先处理输入缓存中的数据

    4.Exactly Once vs. At Least Once
  • 对齐就Exactly Once(两个Barrier之间的数据就像在一个事务里一样,sink收到所有barrier n 时提交任务),不对齐就At Least Once

  • Flink提供了在 checkpoint 时关闭对齐的方法,当 operator 接收到一个 barrier 时,就会打一个快照,而不会等待其他 barrier。
  • 跳过对齐操作使得即使在 barrier 到达时,Operator 依然继续处理输入。这就是说:operator 在 checkpoint n 创建之前,继续处理属于 checkpoint n+1 的数据。所以当异常恢复时,这部分数据就会重复,因为它们被包含在了 checkpoint n 中,同时也会在之后再次被处理。
  • 注意:对齐操作只会发生在拥有多输入运算(join)或者多个输出的 operator(重分区、分流)的场景下。所以,对于 map(), flatmap(), fliter() 等的并行操作即使在至少一次的模式中仍然会保证严格一次。

    5.使用Checkpointing的前提条件

  • 在一定时间内可回溯的datasource(故障时可以回溯数据),常见的:

  • 一般是可持久化的消息队列:例如Kafka、RabbitMQ、Amazon Kinesis、Google PubSub
  • 也可以是文件系统:HDFS、S3、GFS、NFS、Ceph
  • 可持久化存储State的存储系统,通常使用分布式文件系统(Checkpointing就是把job的所有状态都周期性持久化到存储里)
  • 一般是HDFS、S3、GFS、NFS、Ceph
  • 注意:如果想保存checkpointing的时候是exactly-once的,那也需要你的存储端支持幂特性/事务
  • 一般是hbase的rowkey,redies的key或者mysql的事务

帮我把档存到那里?

6.State Backend

选择合适的State Backend

1.什么是State Backend

  • State Backend就是用来保存快照的地方
  • 用来在Checkpointing机制中持久化所有状态的一致性快照,这些状态包括:
  • 非用户定义的状态:例如,timers、非用户自定义的stateful operators(connectors,windows)
  • 用户定义的状态:就是前面讲的用户自定义的stateful operato所使用的Keyed State and Operator State

2.目前Flink自带三个开箱即用State Backend:

1).MemoryStateBackend(默认)

  • MemoryStateBackend在Java堆上维护状态。Key/value状态和窗口运算符使用哈希表存储值和计时器等
  • Checkpoint时,MemoryStateBackend对State做一次快照,并在向JobManager发送Checkpoint确认完成的消息中带上此快照数据,然后快照就会存储在JobManager的堆内存中
  • MemoryStateBackend可以使用异步的方式进行快照(默认开启),推荐使用异步的方式避免阻塞。如果不希望异步,可以在构造的时候传入false(也可以通过全局配置文件指定),如下

StateBackend backend = new MemoryStateBackend(10_1024_1024,false); env.setStateBackend(backend);

  • 限制
  • 单个State的大小默认限制为5MB,可以在MemoryStateBackend的构造函数中增加
  • 不论如何配置,State大小都无法大于akka.framesize(JobManager和TaskManager之间发送的最大消息的大小默认是10MB)
  • JobManager必须有足够的内存大小
  • 适用场景
  • 本地开发和调试
  • 小状态job,如只使用Map、FlatMap、Filter…或Kafka Consumer

2).FsStateBackend

  • FsStateBackend在TaskManager的内存中持有正在处理的数据。Checkpoint时将state snapshot 写入文件系统目录下的文件中。文件的路径等元数据会传递给JobManager,存在其内存中 (或者在HA模式下,存储在元数据checkpoint中)。
  • FsStateBackend可以使用异步的方式进行快照(默认开启),推荐使用异步的方式避免阻塞。如果不希望异步可以在构造的时候传入false(也可以通过全局配置文件指定),如下:
    StateBackend backend = new FsStateBackend(“hdfs://namenode:40010/flink/checkpoints”,false); env.setStateBackend(backend);
  • 适用场景
  • 大状态、长窗口、大键/值状态的job
  • 所有高可用性的情况

3).RocksDBStateBackend

  • RocksDBStateBackend将运行中的数据保存在RocksDB数据库中,(默认情况下)存储在TaskManager数据目录中,在Checkpoint时,整个RocksDB数据库将被Checkpointed到配置的文件系统和目录中。文件的路径等元数据会传递给JobManager,存在其内存中(或者在HA模式下,存储在元数据checkpoint中)。
  • RocksDBStateBackend总是执行异步快照
    StateBackend backend = new RocksDBStateBackend(“hdfs://namenode:40010/flink/checkpoints”); env.setStateBackend(backend);
  • 限制
  • RocksDB JNI API是基于byte[],因此key和value最大支持大小为2^31个字节(2GB)。RocksDB自身在支持较大value时候有问题(merge operations in RocksDB(e.g.ListState))
  • 适用场景
  • 超大状态,超长窗口、大键/值状态的job
  • 所有高可用性的情况
  • 与前两种状态后端对比:
  • 目前只有RocksDBStateBackend支持增量checkpoint(默认全量)
  • 状态保存在数据库中,即使用RockDB可以保存的状态量仅受可用磁盘空间量的限制,相比其他的状态后端可保存更大的状态,但开销更大(读/写需要反序列化/序列化去检索/存储状态),吞吐受到限制
· 使用RocksDBStateBackend特有配置

| 配置项 | 默认值 | 说明 | | —- | —- | —- |

| state.backend.rocksdb.localdir | (none) | |

| state.backend.rocksdb.timer-service.factory | “HEAP” | 指定timer service状态存储在哪里, HEAP/ROCKSDB |

· 代码中配置RocksDBStateBackend(可覆盖全局配置)

  1. StateBackend backend = new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints",true);
  2. env.setStateBackend(backend);

· 需要单独引入POM依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>
  • 默认情况下(MemoryStateBackend):State保存在taskmanager的内存中,checkpoint存储在JobManager的内存中

    4).StateBackend总结

    img

    1).配置StateBackend
  • 全局配置(配置文件conf/flink-conf.yaml) ```

    The backend that will be used to store operator state checkpoints

    state.backend: filesystem

Directory.for storing checkpoints

state.checkpoints.dir: hdfs:namenode:40010/flink/checkpoints

  1. - 每个job单独配置State Backend(可覆盖全局配置)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend(“hdfs://namenode:40010/flink/checkpoints”));

  1. 示例代码:

package cn.zhanghub.state;

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;

public class FileSourceOperatorStateListCheckpointedStateBackend {

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.enableCheckpointing(1000);
  4. CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  5. //保存EXACTLY_ONCE
  6. checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  7. //每次ck之间的间隔,不会重叠
  8. checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
  9. //每次ck的超时时间
  10. checkpointConfig.setCheckpointTimeout(20000L);
  11. //如果ck执行失败,程序是否停止
  12. checkpointConfig.setFailOnCheckpointingErrors(true);
  13. //job在执行CANCE的时候是否删除ck数据
  14. checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  15. //指定保存ck的存储模式,这个是默认的
  16. MemoryStateBackend stateBackend = new MemoryStateBackend(10 * 1024 * 1024, false);
  17. //指定保存ck的存储模式

// FsStateBackend stateBackend = new FsStateBackend(“file:///Users/leohe/Data/output/flink/checkpoints”, true);

// RocksDBStateBackend stateBackend = new RocksDBStateBackend(“file:///Users/leohe/Data/output/flink/checkpoints”, true); env.setStateBackend(stateBackend);

  1. //恢复策略
  2. env.setRestartStrategy(
  3. RestartStrategies.fixedDelayRestart(
  4. 3, // number of restart attempts
  5. Time.of(0, TimeUnit.SECONDS) // delay
  6. )
  7. );
  8. DataStreamSource<String> stringDataStreamSource = env.addSource(new FileCountryDictSourceOperatorStateListCheckpointedFunction());
  9. stringDataStreamSource.map(new MapFunction<String, String>() {
  10. @Override
  11. public String map(String value) throws Exception {
  12. if (value.contains("中国")) {
  13. int a = 1 / 0;
  14. }
  15. return value;
  16. }
  17. }).print();
  18. env.execute();
  19. }

}

  1. ##### Checkpointing的默认全局配置(conf/flink-conf.yaml)
  2. ## 7.Savepoint
  3. |
  4. 配置项
  5. | 默认值
  6. | 说明
  7. |
  8. | --- | --- | --- |
  9. |
  10. state.backend
  11. | (none)
  12. | 用于指定checkpoint state存储的backend, 默认为none; 目前支持的backends'jobmanager','filesystem','rocksdb' 也可以使用它们的工厂类的全限定名: 例如org.apache.flink.runtime.state.filesystem. FsStateBackendFactory 如果没指定, 默认使用jobmanager, MemoryStateBackend
  13. |
  14. |
  15. state.backend.async
  16. | true
  17. | 用于指定backend是否使用异步, 有些不支持async或者只支持asyncstate backend可能会忽略这个参数
  18. |
  19. |
  20. state.backend.fs.memorythreshold
  21. | 1024
  22. | 用于指定存储statefiles大小阈值, 如果小于该值则会存储在root checkpoint metadata file
  23. |
  24. |
  25. state.backend.incremental
  26. | false
  27. | 用于指定是否采用增量checkpoint, 有些不支持增量checkpointbackend会忽略该配置; 目前只有rocksdb支持
  28. |
  29. |
  30. state.backend.local-recovery
  31. | false
  32. |
  33. |
  34. |
  35. state.checkpoints.dir
  36. | (none)
  37. | 用于指定checkpointdata filesmeta data存储的目录, 该目录必须对所有参与的TaskManagersJobManagers可见(有读写权限) 例如: hdfs://namenode-host:port/flink-checkpoints
  38. |
  39. |
  40. state.checkpoints.num-retained
  41. | 1
  42. | 用于指定保留的已完成的checkpoints最大个数
  43. |
  44. |
  45. state.savepoints.dir
  46. | (none)
  47. | 用于指定savepoints的默认目录 例如: hdfs://namenode-host:port/flink-checkpoints
  48. |
  49. |
  50. taskmanager.state.local.root-dirs
  51. | (none)
  52. |
  53. |
  54. #### 1).Savepoint概念
  55. -
  56. 概念
  57. -
  58. - savepoint可以理解为是一种特殊的checkpointsavepoint就是指向checkpoint的一个指针,实际上也是使用通过checkpointing机制创建的streaming job的一致性快照,可以保存数据源的offset、并行操作状态也就是流处理过程中的状态历史版本。需要手动触发,而且不会过期,不会被覆盖,除非手动删除。正常情况下的线上环境是不需要设置savepoint的。除非对job或集群做出重大改动的时候, 需要进行测试运行。
  59. -
  60. - 可以从应用在过去的任意做了savepoint的时刻开始继续消费,具有可以replay的功能
  61. -
  62. Savepoint由两部分组成:
  63. -
  64. - 数据目录:稳定存储上的目录,里面的二进制文件是streaming job状态的快照
  65. - 元数据文件:指向数据目录中属于当前Savepoint的数据文件的指针(绝对路径)
  66. -
  67. Checkpoint的区别:Savepoint相当于备份(类比数据库备份)、Checkpoint相当于recovery log
  68. -
  69. - CheckpointFlink自动创建的"recovery log"用于故障自动恢复,由Flink创建,不需要用户交互。用户cancel作业时就删除,除非启动了保留机制(External Checkpoint
  70. - Savepoint由用户创建,拥有和删除,保存点在作业终止后仍然存在。
  71. -
  72. 作用
  73. -
  74. - job开发新版本(更改job graph、更改并行度等等),应用重新发布
  75. - Flink版本的更新
  76. - 业务迁移,集群需要迁移,不容许数据丢失
  77. #### 2).区分Checkpoint、External Checkpoint、Savepoint
  78. |
  79. 概念
  80. | 描述
  81. | 使用场景
  82. |
  83. | --- | --- | --- |
  84. |
  85. Checkpoint
  86. | 定期、自动的对job下的所有状态多快照并存储,会过期,仅用于从故障中恢复(重启策略)。当job cancel之后会被删除。
  87. | 应用内部restarting时使用
  88. |
  89. |
  90. External Checkpoint
  91. | 一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint处理;属于checkpoint的范畴
  92. |
  93. |
  94. |
  95. Savepoint
  96. | 用于创建的job state的“备份”,是一种特殊的checkpoint,只不过不像checkpoint定期的从系统中去触发的,它是用户通过命令触发,存储格式和checkpoint一样(代码都一样); 注意:当Checkpoint使用RocksDBStateBackend并使用增量Checkpoint时会使用RocksDB内部格式,就跟Savepoint格式不一样了
  97. | job开发新版本(更改job graph、更改并行度等等),应用重新发布 Flink版本的更新 业务迁移,集群需要迁移,不容许数据丢失
  98. |
  99. #### 3).assigning Operator ID
  100. - 为了便于未来升级job程序,建议为operator分配ID,例如:

DataStream stream = env. //Stateful source(e.g. Kafka) with ID .addSource(new StatefulSource()) .uid(“source-id”) //ID for the source operator .shuffle() // Stateful mapper with ID .map(new StatefulMapper()) .name(“mapper-id”) //ID for the mapper //Stateless printing sink .print(); // Auto-generated ID

  1. - 如果不指定ID会自动生成,只要ID不变就能从指定的Savepoint恢复。自动生成ID依赖于结构,代码会变更会导致ID改变,所以手工分配ID是推荐的做法
  2. - 设置ID之后,Savepoint可以想象为一个map映射(Operator ID -> State
  3. #### 4).触发Savepoint
  4. -
  5. 直接触发Savepoint(想象你要为数据库做个备份)
  6. <br />$ bin**/**flink savepoint **:**jobId **[:**targetDirectory**]**
  7. · 直接触发Savepoint(Flink on yarn):<br />$ bin**/**flink savepoint **:**jobId **[:**targetDirectory**]** **-**yid **:**yarnAppId<br />· Cancel Job with Savepoint:<br />$ bin**/**flink cancel **-**s **[:**targetDirectory**]** **:**jobId
  8. #### 5).从Savepoint恢复job
  9. -
  10. 从指定Savepoint恢复job
  11. <br />$ bin**/**flink run **-**s **:**savepointPath **[:**runArgs**]**
  12. · 从指定Savepoint恢复job(允许跳过不能映射的状态,例如删除了一个operator)<br />$ bin**/**flink run **-**s **:**savepointPath **-**n **[:**runArgs**]**
  13. #### 6).删除Savepoint
  14. · 删除Savepoint<br />$ bin**/**flink savepoint **-**d **:**savepointPath<br />· 注意:还可以通过常规的文件系统操作手动删除Savepoint(不影响其他SavepointCheckpoint
  15. ## 8.状态的重新分配
  16. **Operator State** **与Keyed State** **的Redistribute** **(重新分配)**
  17. ### 1).Operator State Redistribute
  18. · Redistribute:当Operator改变并发度的时候(Rescale),会触发状态的Redistribute,即Operator State里的数据会重新分配到OperatorTask实例<br />· 举例:某Operator的并行度由3改为2<br />![img](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702035937.jpg)
  19. -
  20. 不同数据结构的动态扩展方式不一样:
  21. -
  22. - ListState:并发度在改变的时候,会将并发上的每个List都取出,然后把这些List合并到一个新的List,然后根据元素的个数在均匀分配给新的Task
  23. - UnionListState:相比于ListState更加灵活,把划分的方式交给用户去做,当改变并发的时候,会将原来的List拼接起来。然后不做划分,直接交给用户(每个Task给全量的状态,用户自己划分)
  24. - BroadcastState:如大表和小表做Join时,小表可以直接广播给大表的分区,在每个并发上的数据都是完全一致的。做的更新也相同,当改变并发的时候,把这些数据COPY到新的Task即可,以上是Flink Operator States提供的3种扩展方式,用户可以根据自己的需求做选择。
  25. ### 2).Keyed State的Redistribute(重新分配)
  26. -
  27. Keyed State Redistribute
  28. -
  29. -
  30. KeyRedistribute哪个task,他对应的Keyed State就被Redistribute到哪个Task
  31. -
  32. Keyed State Redistribute是基于Key Group来做分配的:
  33. -
  34. - key分为group
  35. - 每个key分配到唯一的group
  36. - group分配给task实例
  37. - Keyed State最终分配到哪个Taskgroup IDtaskID是从0开始算的
  38. - hash=hash(key)<br /> - KG=hash % numOfKeyGroups<br /> - Subtask=KG* taskNum / numOfKeyGroups<br /> numOfKeyGroups是有多少个组,taskNum有多少个任务,KG是组ID0开始算,Subtask是任务ID0开始算<br />![img](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702035932.jpg)<br />CheckpointedFunction如何选择重分配策略<br />CheckpointedFunction接口通过不同的Redistribute方案提供对Operator State的访问<br />获取状态时指定:getListState/getUnionListState(注意:ListState的不同分配策略,自己要根据不同的分配策略写对应拿取数据的逻辑)
  39. ## 9.kafka的source与sink的容错
  40. ### 1.FlinkKafkaConsumer容错
  41. #### 1).理解FlinkKafkaSource的容错性 (影响出错时数据消费的位置)
  42. ![img](../images/%E5%A4%A7%E6%95%B0%E5%89%A7-flink-%E9%AB%98%E7%BA%A7%E7%BC%96%E7%A8%8BA/clip_image006-1610996283405.gif)
  43. - 如果Flink启用了检查点,Flink Kafka Consumer将会周期性的checkpointKafka偏移量到快照。
  44. - 通过实现CheckpointedFunction
  45. - ListState<Tuple2<KafkaTopicPartition, Long>>
  46. - 保证仅一次消费
  47. - 如果作业失败,Flink将流程序恢复到最新检查点的状态,并从检查点中存储的偏移量开始重新消费Kafka中的记录。(此时前面所讲的消费策略就不能决定消费起始位置了,因为出故障了)
  48. #### 2).Flink Kafka Consumer Offset提交行为
  49. |
  50. 情形
  51. | 谁决定消费起始位置
  52. |
  53. | --- | --- |
  54. |
  55. 禁用Checkpoint
  56. | Flink Kafka Consumer依赖于内部使用的Kafka客户端的自动定期偏移提交功能。因此,要禁用或启用偏移量提交,只需将enable.auto.commit(或auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms设置设置到Kafka客户端的Properties
  57. |
  58. |
  59. 启用Checkpoint
  60. | Checkpoint时会保存Offsetsnapshot 当一次Checkpoint完成时,Flink Kafka Consumersnapshot中的偏移量提交给 kafka/zookeeper。这确保Kafka Consumer中提交的偏移量与检查点状态中的偏移量一致。 用户可以通过调用Flink Kafka ConsumersetCommitOffsetsOnCheckpoints(boolean) ,方法来禁用或启用偏移提交到kafka/zookeeper (默认情况下,行为为true)。 在此场景中,Properties中的自动定期偏移量提交设置将被完全忽略。
  61. |
  62. #### 3).不同情况下消费起始位置的分析
  63. |
  64. 情形
  65. | 谁决定消费起始位置
  66. |
  67. | --- | --- |
  68. |
  69. 第一次启动, savepoint(常规情况)
  70. | 由消费模式决定
  71. |
  72. |
  73. 通过savepoint启动(应用升级,比如加 大并行度)
  74. | savepoint记录的offset决定
  75. |
  76. |
  77. checkpoint,失败后,job恢复的情况
  78. | checkpointsnapshoot中记录的offset决定
  79. |
  80. |
  81. checkpoint,失败后,job恢复的情况
  82. | 由消费模式决定
  83. |
  84. ### 2.FlinkKafkaProducer容错
  85. |
  86. 版本
  87. | 容错性保证
  88. |
  89. | --- | --- |
  90. |
  91. Kafka 0.8
  92. | at most once(有可能丢数据)
  93. |
  94. |
  95. Kafka 0.9/0.10
  96. | 启动checkpoint时默认保证at-least-once(有可能重复) setLogFailuresOnly(boolean) 默认是falsefalse保证at-least-once)往kafka发送数据失败了是否打日志: False:不打日志,直接抛异常,导致应用重启(at-least-once True:打日志(丢数据) setFlushOnCheckpoint(boolean) 默认是truetrue保证at_least_onceFlink checkpoint时是否等待正在写往kafka的数据返回ack
  97. |
  98. |
  99. Kafka 0.11
  100. | 必须启动checkpoint 可以通过构造参数选择容错性语意: Semantic.NONE:可能丢失也可能重复 Semantic.AT_LEAST_ONCE:不会丢失,但可能重复(默认) Semantic.EXACTLY_ONCE:使用Kafka事务提供exactly-once语义
  101. |

Properties producerPropsSns = new Properties(); producerPropsSns.setProperty(“bootstrap.servers”, “s1.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092”); producerPropsSns.setProperty(“retries”, “3”); //FlinkKafkaProducer010类的构造函数支持自定义kafka的partitioner, FlinkKafkaProducer010 kafkaOut = new FlinkKafkaProducer010(“flink_event_result”,new SimpleStringSchema(),producerPropsSns,new HainiuFlinkPartitioner()); kafkaOut.setLogFailuresOnly(false); kafkaOut.setFlushOnCheckpoint(true);

```