• Spark streaming 是微批处理,延迟秒级。
  • flink四大基石 :窗口 时间及水位线 状态 检查点
  • flink序列化框架直接操作二进制

image.png

  • JobManager将JobGraph转换为数据流 图,包含所有可以并发执行的任务。

    Opertator Chains

  • disableChaining()

  • startNewChain() 不与上游合并
  • slotSharingGroup():设置group,不同group不共享slot

    Source

    自定义Source:flinkdemo/FileCountryDictSourceFunction.java at main · justforword/flinkdemo

    kafka Source

    image.png
    动态Topic discovery

  • Flink Kafka Consumer支持动态发现Kafka Topic,仅限通过正则表达式指定topic的方式

  • 默认禁止动态发现topic,把flink.partition-discovery.interval-millis设置大于0即可启用

image.png

Transformation

image.png

  • windowAll():所有数据在一个window跑

    operators

    connect & union

  • connect之后生成ConnectedStreams,会对两个流的数据应用不同的处理方法,并且双流之间可以共享状态(比如计数)。这在第一个流的输入会影响第二个流时, 会非常有用。union 合并多个流,新的流包含所有流的数据。

  • union是DataStream -> DataStream
  • connect只能连接两个流,而union可以连接多于两个流
  • connect连接的两个流类型可以不一致,而union连接的流的类型必须一致

    connect【各流处理各的】

    image.png

    keyby

    转换关系: DataStream -> KeyedStream

    keyedStream.connect()

    image.png

    key的数据类型

  • 不能是没有覆盖hashCode方法的POJO(也就是bean)

  • 不能是数组

    自定义分区器 解决数据倾斜

    image.png
    keySelector选择key,然后分区。

rebalance() & rescale

image.png
rescale

  • 数据传输都在一个TaskManager内不需要通过网络。

reduce

OutputTag:分流

定义:
image.png
image.png
image.png

状态与容错

重启job策略

image.png

state

image.png
image.png

Operator State

  • 只支持ListState
  • 绑定到特定operator并行实例,每个operator的并行实例维护一个状态

    Keyed State

  • 只能在作用于KeyedStrem上的function/Operator里使用

  • 每个并行keyed Operator的每个实例的每个key有一个Keyed State:即就是一个唯一的状态,由于每个key属于一个keyed operator的并行实例,因此我们可以将其简单地理解为
  • 支持所有类型

image.png

Broadcast State

  • 只支持MapState

    checkpoint开启

    默认没有开启checkpoint

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //start a checkpoint every 1000 ms env.enableCheckpointing(1000);

    //advanced options:

    //set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    //make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

    //checkpoints have to complete within one minute,or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000);

    //allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

    //enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    env.getCheckpointConfig().setFailOnCheckpointingErrors(true);

Flink检查点(checkpoint)、保存点(savepoint)的区别与联系_LittleMagic’s Blog-CSDN博客

使用Operator State

方式1:实现CheckpointedFunction

Stateful function(RichFunction)实现CheckpointedFunction接口,必须实现两个方法:

  • void snapshotState(FunctionSnapshotContext context) throws Exception
    • Checkpoint执行时调用
    • 一般用于原始状态与托管状态进行交换
  • Void initializeState(FunctionlnitializationContext context) throws Exception;(初始化以及恢复逻辑)
    • Stateful function第一次初始化时调用
    • Stateful function从较早的checkpoint恢复时调用
    • 相比于open()方法:context可判断是否复活,恢复状态。

image.png

方式2:实现ListCheckpointed(@Deprecated)

这个接口自己本身就带了一个ListState
image.png

使用KeyedState

public class CountryCodeConnectKeyByKeyedState {

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.enableCheckpointing(1000);
  4. CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  5. //保存EXACTLY_ONCE
  6. checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  7. //每次ck之间的间隔,不会重叠
  8. checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
  9. //每次ck的超时时间
  10. checkpointConfig.setCheckpointTimeout(20000L);
  11. //如果ck执行失败,程序是否停止
  12. checkpointConfig.setFailOnCheckpointingErrors(true);
  13. //job在执行CANCE的时候是否删除ck数据
  14. checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  15. //恢复策略
  16. env.setRestartStrategy(
  17. RestartStrategies.fixedDelayRestart(
  18. 3, // number of restart attempts
  19. Time.of(0, TimeUnit.SECONDS) // delay
  20. )
  21. );
  22. DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
  23. Properties kafkaConsumerProps = new Properties();
  24. kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
  25. kafkaConsumerProps.setProperty("group.id", "qingniuflink");
  26. kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
  27. FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
  28. // kafkaSource.setStartFromEarliest()
  29. // kafkaSource.setStartFromGroupOffsets()
  30. kafkaSource.setStartFromLatest();
  31. DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
  32. KeyedStream<Tuple2<String, String>, String> countryDictKeyBy = countryDictSource.map(new MapFunction<String, Tuple2<String, String>>() {
  33. @Override
  34. public Tuple2<String, String> map(String value) throws Exception {
  35. String[] split = value.split("\t");
  36. return Tuple2.of(split[0], split[1]);
  37. }
  38. }).keyBy(new KeySelector<Tuple2<String, String>, String>() {
  39. @Override
  40. public String getKey(Tuple2<String, String> value) throws Exception {
  41. return value.f0;
  42. }
  43. });
  44. KeyedStream<HainiuKafkaRecord, String> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, String>() {
  45. @Override
  46. public String getKey(HainiuKafkaRecord value) throws Exception {
  47. return value.getRecord();
  48. }
  49. });
  50. ConnectedStreams<Tuple2<String, String>, HainiuKafkaRecord> connect = countryDictKeyBy.connect(record);
  51. SingleOutputStreamOperator<String> connectInput = connect.process(new KeyedCoProcessFunction<String, Tuple2<String, String>, HainiuKafkaRecord, String>() {
  52. private MapState<String,String> map = null;
  53. @Override
  54. public void open(Configuration parameters) throws Exception {
  55. //keyState的TTL策略
  56. StateTtlConfig ttlConfig = StateTtlConfig
  57. //keyState的超时时间为100秒
  58. .newBuilder(Time.seconds(100))
  59. //当创建和更新时,重新计时超时时间
  60. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  61. //失败时不返回keyState的值
  62. //.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  63. //失败时返回keyState的值
  64. .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
  65. //ttl的时间处理等级目前只支持ProcessingTime
  66. .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
  67. .build();
  68. //从runtimeContext中获得ck时保存的状态
  69. MapStateDescriptor<String,String> msd = new MapStateDescriptor<String, String>("map",String.class,String.class);
  70. msd.enableTimeToLive(ttlConfig);
  71. map = getRuntimeContext().getMapState(msd);
  72. }
  73. @Override
  74. public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
  75. map.put(ctx.getCurrentKey(), value.f1);
  76. out.collect(value.toString());
  77. }
  78. @Override
  79. public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
  80. for(Map.Entry<String,String> m:map.entries()){
  81. System.out.println(m.getKey());
  82. System.out.println(m.getValue());
  83. }
  84. if(value.getRecord().equals("CN")){
  85. int a = 1/0;
  86. }
  87. String countryCode = ctx.getCurrentKey();
  88. String countryName = map.get(countryCode);
  89. String outStr = countryName == null ? "no match" : countryName;
  90. out.collect(outStr);
  91. }
  92. });
  93. connectInput.print();
  94. env.execute();
  95. }

}

使用BroadcastState

Broadcast State使用套路(三步)

  • 创建常规事件流DataStream / KeyedDataStream
  • 创建BroadcastedStream:创建规则流 / 配置流(低吞吐)并广播
  • 连接两个Stream,生成BroadcastConnectedStream并实现计算处理
  • proccess(BroadcastProcessFunction and KeyedBroadcastProcessFunction)

image.png
BroadcastedStream和 keyedStream做connect:
KeyedBroadcastProcessFunction
image.png

Checkpoint原理

  • 通过往source 注入barrier
  • barrier作为checkpoint的标志

flink怎么保证Exactly Once

  • barrier n 所属的数据流先不处理,从这些数据流中接收到数据被放入接收缓存里(input buffer)
  • 当从最后一个流中提取到barrier n 时,operator 会发射出所有等待向后发送的数据,然后发射snapshot n 所属的barrier

image.png

State Backend

image.png

  • MemoryStateBackend:将state保存到JobManager中。默认,异步。
  • FsStateBackend(全量存储)
    • 需要配合ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
    • image.png
  • RocksDBStateBackend(增量存储)

Savepoint:手动checkpoint

image.png
恢复:

  • 从指定Savepoint恢复job
    $ bin/flink run -s :savepointPath [:runArgs]
  • 从指定Savepoint恢复job(允许跳过不能映射的状态,例如删除了一个operator)

$ bin/flink run -s :savepointPath -n [:runArgs]

状态重新分配

image.png

kafka的source与sink的容错

  • 如果Flink启用了检查点,Flink Kafka Consumer将会周期性的checkpoint其Kafka偏移量到快照。

kafka Consumer Offset提交:
image.png
image.png

time

image.png
image.png

watermarker

  • 窗口触发时间
  • watermark基于Event Time。

    提取watermark

    方法一
    image.png
    image.png
    方法二 提取watermark:
    image.png
    方法三:直接在source function中生成
    image.png

window

window设置窗口大小,watermark触发窗口。
image.png

window分类

image.png
按时间
image.png
数量
image.png
Global window
需要配置触发器,来触发窗口执行。

  • Trigger :触发器
  • Evictor :“驱逐者”

image.png

image.png

image.png

window operator

WindowsAll

WindowsAll会忽略keyby

windows join

  1. cogroup

image.png

  1. join

image.png
image.png

ProcessFunction

ProcessFunction可以看作是一个具有keyed state 和 timers访问权的FlatMapFunction。

timerService

image.png
KeyedCoProcessFunction中的变量,所有key共享。解决方法:使用ValueState。

  • 由于Flink对每个键和时间戳只维护一个计时器,因此可以通过降低计时器频率来合并计时器,从而减少计时器的数量。
  • event-time timer只会在watermarks到来时触发
  • 使用ctx.timerService().deleteEventTimeTimer删除过期Timer

    累加器

————

任务提交流程

image.png

window

window分类

window function 窗口函数

image.png

  • sum函数其实调用的aggregate函数,aggregate调用reduce
  • reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个
  • aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有

image.png

  • ReduceFunction处理增量数据
    • 来一个计算一个
  • WindowFunction处理全量数据
    • 等window到点一块计算

      ProcessWindowFunction

      image.png

其他api

image.png

延迟数据处理

sideOutputLateData

时间语义

watermark


  • ProcessFunction API

    image.png

  • 定时器

    • image.png
  • 侧输出流

    状态管理

    状态的分类

    image.pngf

  • 算子状态

    • 作用范围是一个算子(分区)
    • kafka记录
  • 键控状态
    • 作用范围是每个key
    • image.png
    • image.png
      • reduce
  • 状态后端

    • image.png

      checkpoint

      开启:
      image.png
      模式:
  • exactly-once(默认)

  • al least once

image.png
Flink检查点(checkpoint)、保存点(savepoint)的区别与联系_LittleMagic’s Blog-CSDN博客

state backends

默认异步快照

  • MemoryStateBackend:将state保存到JobManager中。
  • FsStateBackend
    • 需要配合ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
    • image.png
  • RocksDBStateBackend

    restart strategies

    image.png
    默认没有重启策略
    image.png

高可用

job持久化

image.png

checkpoint

image.png
image.png-

检查点

image.png