The Broadcast State Pattern 广播状态模式

With State描述操作符状态,在恢复时,操作符状态在操作符的并行任务中均匀分布,或者统一,整个状态用于初始化还原的并行任务。

第三种类型的支持 operator stateBroadcast State。引入广播状态以支持使用其中来自一个流的一些数据需要被广播到所有下游任务的使用情况,其中它被本地存储并且用于处理另一个流中的所有输入元素。作为广播状态可以作为自然拟合出现的示例,人们可以想象包含一组规则的低吞吐量流,我们希望对来自另一个流的所有元素进行评估。考虑到以上类型的使用情况,广播状态与运营商剩余部分的不同之处在于:

  1. 它具有MAP格式,
  2. 它仅适用于具有以下输入的特定运算符 broadcastednon-broadcasted ,并且
  3. 这样的运算符可以具有不同名称的 multiple broadcast states

Provided APIs 提供API

为了显示所提供的API,我们将从一个示例开始,然后介绍它们的全部功能。作为我们正在运行的示例,我们将使用这样的情况:我们有一个具有不同颜色和形状的对象流,并且我们希望找到与某一模式相同颜色的对象对,例如一个矩形,后面跟着一个三角形。我们假设一组有趣的模式会随着时间的推移而演变。

在本例中,第一个流将包含具有ColorShape属性的Item类型的元素。另一条溪流将包含 Rules

Items的流开始,我们只需要 key it by Color,因为我们想要相同颜色的对。这将确保相同颜色的元素在同一台物理机器上结束。

  1. // key the shapes by color
  2. KeyedStream<Item, Color> colorPartitionedStream = shapeStream
  3. .keyBy(new KeySelector<Shape, Color>(){...});

接下来是 Rules,包含这些内容的流应广播到所有下游任务,这些任务应在本地存储,以便能够对照所有传入的“Items”对它们进行评估。下面的代码片段将(I)广播规则流,(Ii)使用提供的MapStateDescriptor,它将创建存储规则的广播状态。

  1. // a map descriptor to store the name of the rule (string) and the rule itself.
  2. MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
  3. "RulesBroadcastState",
  4. BasicTypeInfo.STRING_TYPE_INFO,
  5. TypeInformation.of(new TypeHint<Rule>() {}));
  6. // broadcast the rules and create the broadcast state
  7. BroadcastStream<Rule> ruleBroadcastStream = ruleStream
  8. .broadcast(ruleStateDescriptor);

最后,为了评估Item流中对输入元素的Rules ,我们需要:

  1. connect the two streams, and
  2. specify our match detecting logic.

可以通过在非广播流中调用“连接()”来实现将流(键控或非键控)与“广播流”连接,其中“广播流”作为参数。这将返回一个“broadcastConnectedstream”,我们可以用一种特殊类型的“cowprocessfunction”调用“进程()”。函数将包含我们的匹配逻辑。函数的确切类型取决于非广播流的类型:

  • 如果是keyed,则函数是KeyedBroadcastProcessFunction
  • 如果它是 non-keyed,则该函数是BroadcastProcessFunction

假定我们的非广播流被键入,以下代码片段包括上述调用:

注意:应在非广播流中调用连接,并将广播流用作参数。

  1. DataStream<Match> output = colorPartitionedStream
  2. .connect(ruleBroadcastStream)
  3. .process(
  4. // type arguments in our KeyedBroadcastProcessFunction represent:
  5. // 1\. the key of the keyed stream
  6. // 2\. the type of elements in the non-broadcast side
  7. // 3\. the type of elements in the broadcast side
  8. // 4\. the type of the result, here a string
  9. new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
  10. // my matching logic
  11. }
  12. )

BroadcastProcessFunction and KeyedBroadcastProcessFunction 广播过程功能与关键—广播过程功能

CoProcessFunction一样,这些函数有两种实现的处理方法: processBroadcastElement() 负责处理广播流中的传入元素,以及用于非广播的 processElement() 。方法的完整签名如下:

  1. public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
  2. public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
  3. public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
  4. }
  1. public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
  2. public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
  3. public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
  4. public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
  5. }

首先要注意的是这两个功能都需要执行广播侧的元件的 processElement() 方法和在非广播侧的元素的processBroadcastElement()

两种方法在提供的上下文中不同。非广播侧具有 ReadOnlyContext,而广播侧具有 Context

以下枚举中的两个上下文(“CTX”):

  1. 访问广播状态: ctx.getBroadcastState(MapStateDescriptor&lt;K, V&gt; stateDescriptor)
  2. 允许查询元素的时间戳: ctx.timestamp()
  3. 获取当前水印:ctx.currentWatermark()
  4. 获取当前处理时间:ctx.currentProcessingTime()
  5. 将元素发送到侧输出:ctx.output(OutputTag&lt;X&gt; outputTag, X value)

getBroadcastState() 中的 stateDescriptor 应与上文 .broadcast(ruleStateDescriptor) 中的“相同”。

差异在于每个人给予广播状态的访问类型。广播侧具有read-write access,而非广播侧具有 read-only access(因此,名称)。这是因为在flink中没有交叉任务通信。因此,为了保证广播状态的内容在我们运营商的所有并行实例中是相同的,我们只对广播侧给出读-写访问,广播侧在所有任务中看到相同的元素,并且我们需要在该一侧上的每个输入元素的计算在所有任务上是相同的。忽略此规则将破坏状态的一致性保证,导致调试结果不一致且往往很困难。

注意:processBroadcast()中实现的逻辑必须在所有并行实例中具有相同的确定性行为!

最后,由于 KeyedBroadcastProcessFunction 在键控流中操作,所以它暴露了一些不可用于 BroadcastProcessFunction的功能。即:

  1. processElement() 方法中的“ReadOnlyContext”方法允许访问Flink的底层计时器服务,该服务允许注册事件和/或处理时间计时器。当计时器触发时,使用‘OnTimerContext’调用 onTimer() (如上文所示),该OnTimerContext公开与ReadOnlyContextplus相同的功能。
    • 询问触发的计时器是否为事件或处理时间的能力
    • 查询与计时器关联的密钥。
  2. processBroadcastElement() 方法中的Context包含applyToKeyedState(StateDescriptor&lt;S, VS&gt; stateDescriptor, KeyedStateFunction&lt;KS, S&gt; function)的方法。这允许向与所提供的stateDescriptor相关联的所有密钥**的所有状态注册 KeyedStateFunction to be。

注意: 注册定时器仅在 KeyedBroadcastProcessFunctionprocessElement() 和此处仅有可能。在 processBroadcastElement()方法中不可能,因为没有与广播的元素相关联的密钥。

回到我们的原始示例,我们的KeyedBroadcastProcessFunction可以如下所示:

  1. new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
  2. // store partial matches, i.e. first elements of the pair waiting for their second element
  3. // we keep a list as we may have many first elements waiting
  4. private final MapStateDescriptor<String, List<Item>> mapStateDesc =
  5. new MapStateDescriptor<>(
  6. "items",
  7. BasicTypeInfo.STRING_TYPE_INFO,
  8. new ListTypeInfo<>(Item.class));
  9. // identical to our ruleStateDescriptor above
  10. private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
  11. new MapStateDescriptor<>(
  12. "RulesBroadcastState",
  13. BasicTypeInfo.STRING_TYPE_INFO,
  14. TypeInformation.of(new TypeHint<Rule>() {}));
  15. @Override
  16. public void processBroadcastElement(Rule value,
  17. Context ctx,
  18. Collector<String> out) throws Exception {
  19. ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
  20. }
  21. @Override
  22. public void processElement(Item value,
  23. ReadOnlyContext ctx,
  24. Collector<String> out) throws Exception {
  25. final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
  26. final Shape shape = value.getShape();
  27. for (Map.Entry<String, Rule> entry :
  28. ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
  29. final String ruleName = entry.getKey();
  30. final Rule rule = entry.getValue();
  31. List<Item> stored = state.get(ruleName);
  32. if (stored == null) {
  33. stored = new ArrayList<>();
  34. }
  35. if (shape == rule.second && !stored.isEmpty()) {
  36. for (Item i : stored) {
  37. out.collect("MATCH: " + i + " - " + value);
  38. }
  39. stored.clear();
  40. }
  41. // there is no else{} to cover if rule.first == rule.second
  42. if (shape.equals(rule.first)) {
  43. stored.add(value);
  44. }
  45. if (stored.isEmpty()) {
  46. state.remove(ruleName);
  47. } else {
  48. state.put(ruleName, stored);
  49. }
  50. }
  51. }
  52. }

Important Considerations 重要考虑

在描述了提供的API之后,本节将重点介绍在使用广播状态时要记住的重要事项。它们是:

  • 没有交叉任务通信: 如前所述,这就是为什么只有(Keyed)-BroadcastProcessFunction的广播侧才能修改广播状态的内容的原因。此外,用户必须确保所有任务以与每个输入元素相同的方式修改广播状态的内容。否则,不同的任务可能有不同的内容,导致结果不一致。

  • 广播状态下的事件顺序可能因任务不同而不同: 虽然广播流的元素保证所有元素(最终)都会到达所有下游任务,但元素可能会以不同的顺序到达每个任务。因此,每个传入元素的状态更新不能依赖于传入事件的顺序。

  • 所有任务的广播状态: 虽然在发生检查点时,所有任务的广播状态中都有相同的元素(检查点屏障不跨越元素),但所有任务的广播状态都是检查点,而不仅仅是其中的一个。这是一个设计决定,以避免在还原过程中从同一个文件中读取所有任务(从而避免热点),尽管代价是将校验状态的大小增加一倍(=并行性)。Flink保证在恢复/重新缩放时,将没有重复的没有丢失的数据**。在具有相同或较小并行性的恢复情况下,每个任务都读取其校验点状态。升级后,每个任务读取自己的状态,其余任务(‘p_new-`p_old’)以循环方式读取以前任务的检查点。

  • 没有RocksDB状态后端: 广播状态保存在运行时内存中,应该相应地进行内存配置。这适用于所有运算符状态。