1. BroadcastState

1.1 BroadcastState介绍

在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用 Broadcast State。Broadcast State 是 Flink 1.5 引入的新特性。
下游的task 接收这些配置、规则并保存为 BroadcastState,将这些配置应用到另一个数据流的计算中 。

场景举例
1) 动态更新计算规则: 如事件流需要根据最新的规则进行计算,则可将规则作为广播状态广播到下游Task中。
2) 实时增加额外字段: 如事件流需要实时增加用户的基础信息,则可将用户的基础信息作为广播状态广播到下游Task中。
API介绍
首先创建一个Keyed 或Non-Keyed 的DataStream,
然后再创建一个BroadcastedStream,
最后通过DataStream来连接(调用connect 方法)到Broadcasted Stream 上,
这样实现将BroadcastState广播到Data Stream 下游的每个Task中。

1.如果DataStream是Keyed Stream ,则连接到Broadcasted Stream 后, 添加处理ProcessFunction 时需要使用KeyedBroadcastProcessFunction 来实现, 下面是KeyedBroadcastProcessFunction 的API,代码如下所示:

  1. public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
  2. public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
  3. public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
  4. }

上面泛型中的各个参数的含义,说明如下:
KS:表示Flink 程序从最上游的Source Operator 开始构建Stream,当调用keyBy 时所依赖的Key 的类型;
IN1:表示非Broadcast 的Data Stream 中的数据记录的类型;
IN2:表示Broadcast Stream 中的数据记录的类型;
OUT:表示经过KeyedBroadcastProcessFunction 的processElement()和processBroadcastElement()方法处理后输出结果数据记录的类型。
2.如果Data Stream 是Non-Keyed Stream,则连接到Broadcasted Stream 后,添加处理ProcessFunction 时需要使用BroadcastProcessFunction 来实现, 下面是BroadcastProcessFunction 的API,代码如下所示:

  1. public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
  2. public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
  3. public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
  4. }

上面泛型中的各个参数的含义,与前面KeyedBroadcastProcessFunction 的泛型类型中的后3 个含义相同,只是没有调用keyBy 操作对原始Stream 进行分区操作,就不需要KS 泛型参数。
具体如何使用上面的BroadcastProcessFunction,接下来我们会在通过实际编程,来以使用KeyedBroadcastProcessFunction 为例进行详细说明。

注意事项
1) Broadcast State 是Map 类型,即K-V 类型。
2) Broadcast State 只有在广播的一侧, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processBroadcastElement 方法中可以修改。在非广播的一侧,即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processElement 方法中只读。
3) Broadcast State 中元素的顺序,在各Task 中可能不同。基于顺序的处理,需要注意。
4) Broadcast State 在Checkpoint 时,每个Task 都会Checkpoint 广播状态。
5) Broadcast State 在运行时保存在内存中,目前还不能保存在RocksDB State Backend 中。

1.2 需求-实现配置动态更新

image.png
实时过滤出配置中的用户,并在事件流中补全这批用户的基础信息。
事件流:表示用户在某个时刻浏览或点击了某个商品,格式如下。

{“userID”: “user_3”, “eventTime”: “2019-08-17 12:19:47”, “eventType”: “browse”, “productID”: 1} {“userID”: “user_2”, “eventTime”: “2019-08-17 12:19:48”, “eventType”: “click”, “productID”: 1}

1.3 编码步骤

  1. env
  2. source -1.构建实时数据事件流-自定义随机 -2.构建配置流-从MySQL <用户id,<姓名,年龄>>
  3. transformation -1.定义状态描述器 MapStateDescriptor>> descriptor = new MapStateDescriptor<>(“config”,Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));

-2.广播配置流 BroadcastStream>> broadcastDS = configDS.broadcast(descriptor);

-3.将事件流和广播流进行连接 BroadcastConnectedStream, Map>> connectDS =eventDS.connect(broadcastDS);

-4.处理连接后的流-根据配置流补全事件流中的用户的信息

  1. sink
  2. execute

2. 双流Join

2.1 介绍

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
https://zhuanlan.zhihu.com/p/340560908
https://blog.csdn.net/andyonlines/article/details/108173259
image.png
双流Join是Flink面试的高频问题。一般情况下说明以下几点就可以hold了:
Join大体分类只有两种:Window Join和Interval Join。
Window Join又可以根据Window的类型细分出3种:
Tumbling Window Join、Sliding Window Join、Session Widnow Join。
Windows类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作;
interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理;
目前Stream join的结果是数据的笛卡尔积;两个流所有符合条件的组合

2.2 Window Join

Tumbling Window Join
执行翻滚窗口联接时,具有公共键和公共翻滚窗口的所有元素将作为成对组合联接,并传递给JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素,因此不会被发射!
如图所示,我们定义了一个大小为2毫秒的翻滚窗口,结果窗口的形式为[0,1]、[2,3]、。。。。该图显示了每个窗口中所有元素的成对组合,这些元素将传递给JoinFunction。注意,在翻滚窗口[6,7]中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦结合的元素。
image.png

  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  3. import org.apache.flink.streaming.api.windowing.time.Time;
  4. ...
  5. DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
  6. orangeStream.join(greenStream)
  7. .where(<KeySelector>)
  8. .equalTo(<KeySelector>)
  9. .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
  10. .apply (new JoinFunction<Integer, Integer, String> (){
  11. @Override
  12. public String join(Integer first, Integer second) {
  13. return first + "," + second;
  14. }
  15. });

Sliding Window Join:
在执行滑动窗口联接时,具有公共键和公共滑动窗口的所有元素将作为成对组合联接,并传递给JoinFunction或FlatJoinFunction。在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发射!请注意,某些元素可能会连接到一个滑动窗口中,但不会连接到另一个滑动窗口中!
在本例中,我们使用大小为2毫秒的滑动窗口,并将其滑动1毫秒,从而产生滑动窗口[-1,0],[0,1],[1,2],[2,3]…。x轴下方的连接元素是传递给每个滑动窗口的JoinFunction的元素。在这里,您还可以看到,例如,在窗口[2,3]中,橙色②与绿色③连接,但在窗口[1,2]中没有与任何对象连接。
image.png

  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
  3. import org.apache.flink.streaming.api.windowing.time.Time;
  4. ...
  5. DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
  6. orangeStream.join(greenStream)
  7. .where(<KeySelector>)
  8. .equalTo(<KeySelector>)
  9. .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
  10. .apply (new JoinFunction<Integer, Integer, String> (){
  11. @Override
  12. public String join(Integer first, Integer second) {
  13. return first + "," + second;
  14. }
  15. });

Session Window Join:
在执行会话窗口联接时,具有相同键(当“组合”时满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。同样,这执行一个内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出!
在这里,我们定义了一个会话窗口连接,其中每个会话被至少1ms的间隔分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三个会话中,绿色流中没有元素,所以⑧和⑨没有连接!
image.png

  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
  3. import org.apache.flink.streaming.api.windowing.time.Time;
  4. ...
  5. DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
  6. orangeStream.join(greenStream)
  7. .where(<KeySelector>)
  8. .equalTo(<KeySelector>)
  9. .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
  10. .apply (new JoinFunction<Integer, Integer, String> (){
  11. @Override
  12. public String join(Integer first, Integer second) {
  13. return first + "," + second;
  14. }
  15. });

2.3 Interval Join

前面学习的Window Join必须要在一个Window中进行JOIN,那如果没有Window如何处理呢?
interval join也是使用相同的key来join两个流(流A、流B),
并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。
b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]
or
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
也就是:
流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界,且,流B的元素的时间戳 ≤ 流A的元素时间戳 + 上界。
image.png
在上面的示例中,我们将两个流“orange”和“green”连接起来,其下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含的,但是可以应用.lowerBoundExclusive()和.upperBoundExclusive来更改行为
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
  3. import org.apache.flink.streaming.api.windowing.time.Time;
  4. ...
  5. DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
  6. orangeStream
  7. .keyBy(<KeySelector>)
  8. .intervalJoin(greenStream.keyBy(<KeySelector>))
  9. .between(Time.milliseconds(-2), Time.milliseconds(1))
  10. .process (new ProcessJoinFunction<Integer, Integer, String(){
  11. @Override
  12. public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
  13. out.collect(first + "," + second);
  14. }
  15. });

2.4 代码演示

需求
来做个案例:
使用两个指定Source模拟数据,一个Source是订单明细,一个Source是商品数据。我们通过window join,将数据关联到一起。
思路
1、Window Join首先需要使用where和equalTo指定使用哪个key来进行关联,此处我们通过应用方法,基于GoodsId关联两个流中的元素。
2、设置5秒的滚动窗口,流的元素关联都会在这个5秒的窗口中进行关联。
3、apply方法中实现将两个不同类型的元素关联并生成一个新类型的元素。

  1. //商品类(商品id,商品名称,商品价格)
  2. //订单明细类(订单id,商品id,商品数量)
  3. //关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
  4. DataStream<FactOrderItem> resultDS = goodsDSWithWatermark.join(OrderItemDSWithWatermark)
  5. .where(Goods::getGoodsId)
  6. .equalTo(OrderItem::getGoodsId)
  7. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  8. //<IN1, IN2, OUT>
  9. .apply(new JoinFunction<Goods, OrderItem, FactOrderItem>() {
  10. @Override
  11. public FactOrderItem join(Goods first, OrderItem second) throws Exception {
  12. FactOrderItem result = new FactOrderItem();
  13. result.setGoodsId(first.getGoodsId());
  14. result.setGoodsName(first.getGoodsName());
  15. result.setCount(new BigDecimal(second.getCount()));
  16. result.setTotalMoney(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));
  17. return result;
  18. }
  19. });

2.5 代码演示

1、通过keyBy将两个流join到一起。
2、interval join需要设置流A去关联哪个时间范围的流B中的元素。此处,我设置的下界为-1、上界为0,且上界是一个开区间。表达的意思就是流A中某个元素的时间,对应上一秒的流B中的元素。
3、process中将两个key一样的元素,关联在一起,并加载到一个新的FactOrderItem对象中。

  1. //商品类(商品id,商品名称,商品价格)
  2. //订单明细类(订单id,商品id,商品数量)
  3. //关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
  4. SingleOutputStreamOperator<FactOrderItem> resultDS = goodsDSWithWatermark.keyBy(Goods::getGoodsId)
  5. .intervalJoin(OrderItemDSWithWatermark.keyBy(OrderItem::getGoodsId))
  6. //join的条件:
  7. // 条件1.id要相等
  8. // 条件2. OrderItem的时间戳 - 2 <=Goods的时间戳 <= OrderItem的时间戳 + 1
  9. .between(Time.seconds(-2), Time.seconds(1))
  10. //ProcessJoinFunction<IN1, IN2, OUT>
  11. .process(new ProcessJoinFunction<Goods, OrderItem, FactOrderItem>() {
  12. @Override
  13. public void processElement(Goods left, OrderItem right, Context ctx, Collector<FactOrderItem> out) throws Exception {
  14. FactOrderItem result = new FactOrderItem();
  15. result.setGoodsId(left.getGoodsId());
  16. result.setGoodsName(left.getGoodsName());
  17. result.setCount(new BigDecimal(right.getCount()));
  18. result.setTotalMoney(new BigDecimal(right.getCount()).multiply(left.getGoodsPrice()));
  19. out.collect(result);
  20. }
  21. });