Joining Streams in Storm Core

Storm 支持通过 JoinBolt 来 join 多个 data streams 变成一个 stream. JoinBolt是一个 Windowed bolt。JoinBolt 会等待配置的窗口时间来匹配被join 的streams的tuples。这有助于通过窗口边界生成streams.

JoinBolt 每个进来的 data streams 必须基于一个字段进行 Field Group。stream只能使用被 FieldsGrouped的字段 join 其他stream。

Performing Joins

考虑下面的 SQL join,设计四张表:

  1. select userId, key4, key2, key3
  2. from table1
  3. inner join table2 on table2.userId = table1.key1
  4. inner join table3 on table3.key3 = table2.userId
  5. left join table4 on table4.key4 = table3.key3

相同的可以使用JoinBolt,join四个spouts,生成想要的tuples:

  1. JoinBolt jbolt = new JoinBolt("spout1", "key1") // from spout1
  2. .join ("spout2", "userId", "spout1") // inner join spout2 on spout2.userId = spout1.key1
  3. .join ("spout3", "key3", "spout2") // inner join spout3 on spout3.key3 = spout2.userId
  4. .leftJoin ("spout4", "key4", "spout3") // left join spout4 on spout4.key4 = spout3.key3
  5. .select ("userId, key4, key2, spout3:key3") // chose output fields
  6. .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
  7. topoBuilder.setBolt("joiner", jbolt, 1)
  8. .fieldsGrouping("spout1", new Fields("key1") )
  9. .fieldsGrouping("spout2", new Fields("userId") )
  10. .fieldsGrouping("spout3", new Fields("key3") )
  11. .fieldsGrouping("spout4", new Fields("key4") );

bolt 构造器需要两个参数.第一个参数介绍了第一个stream来自于 spout1,并指定了通过key1来和其他 streams 连接.组件的名称必须根据直接连接 Join bolt的 spout 或者bolt来设置.这里,来自于spout1的数据必须根据 key1 来 field group。同样的,调用 leftJoin()join() 方法的时候,也会通过这个字段来join.根据上面的例子,FieldGrouping 要求也适用于其他spout 的streams。第三个参数表示streams要和哪个spout的streams连接.

select() 方法用来指定 output fields。select 参数是逗号分隔的字段列表。单个字段可以通过 stream名称作为前缀,来区别不同streams中相同的字段,像这样:.select("spout3:key3, spout4:key3").嵌套的tuple 类型是支持的.例如,outer.inner.innermost 就是一个字段嵌套三层深度,outerinnerMap 的类型.

join 参数中的字段不允许用 stream 名称作为前缀,但是支持嵌套字段.

上面调用 withTumblingWindow()方法,将join 窗口配置成10分钟的翻滚窗口.由于 JoinBolt 是一个窗口 spout,我们还可以使用 withWindow 方法将其配置为滑动窗口(参考下面的提示部分).

Stream names and Join order

  • Streams name 在引用之前必须声明,在构造函数和 join方法的第一个参数都需要 Streams name,join方法的第三个参数会用到Stream name.像下面这样引用stream name是不允许的:
  1. new JoinBolt( "spout1", "key1")
  2. .join ( "spout2", "userId", "spout3") //not allowed. spout3 not yet introduced
  3. .join ( "spout3", "key3", "spout1")
  • 在内部,join将按照用户所表示的顺序执行。

Joining based on Stream names

为了简单起见,Storm topology(拓扑)经常使用 default stream 。拓扑也可以使用命名的stream 而不是default stream。为了支持这种 topology(拓扑),可以通过构造函数的第一个参数将 JoinBolt 配置为使用 stream name 而不是源组件(spout / bolt)名称:

  1. new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")
  2. .join("stream2", "key2")
  3. ...

第一个参数 JoinBolt.Selector.STREAM 通知 bolt stream1/2/3/4 引用 named stream (而不是上游 spouts/bolts的名称)。

以下示例从四个 spouts 连接两个命名流:

  1. new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")
  2. .join ("stream2", "userId", "stream1" )
  3. .select ("userId, key1, key2")
  4. .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
  5. topoBuilder.setBolt("joiner", jbolt, 1)
  6. .fieldsGrouping("bolt1", "stream1", new Fields("key1") )
  7. .fieldsGrouping("bolt2", "stream1", new Fields("key1") )
  8. .fieldsGrouping("bolt3", "stream2", new Fields("userId") )
  9. .fieldsGrouping("bolt4", "stream1", new Fields("key1") );

在上述示例中,例如,spout1也可能发送其他 stream。但是连接 bolt 只是从不同的 bolts 订阅了stream1&stream2。来自bolt1bolt2bolt4stream1被视为单个 stream,并且与bolt3相连接。

Limitations:

1.当前值支持 INNER 和LEFT join.

  1. 不同于SQL,它允许通过不同的 keys 将相同的表和不同的表连接,这里必须在stream 上使用相同的字段. Fields Grouping 保证tuples被正确连接到 JoinBolt的实例.因此,FieldsGrouping字段必须与 join 字段相同,以获得正确的结果.要在多个字段上执行 join,可以将这些字段组着成一个字段,然后发送到 Join Bolt。

Tips:

  1. Join 可以是CPU和内存密集型.当前窗口中积累的数据越大(与窗口长度成正比),join所需要的时间就越长。滑动间隔很短(例如几秒钟)会触发频繁的连接.因此,如果使用大的窗口长度或者小的滑动间隔,则性能可能受损.

  2. 使用滑动窗口时,跨窗口重复 join 记录。这是因为使用滑动窗口时,tuples 在多个窗口中继续存在。

  3. 如果启用了消息超时,请确保超时设置(topology.message.timeout.secs)足够大以舒适地适应窗口大小,以及其他 spouts 和 bolts 的附加处理。

  4. 在最坏的情况下,连接一个具有M和N个元素的两个 streams 的窗口,可以产生每个输出元组的MxN元素,每个输出 tuple 从每个输入流锚定到一个 tuple 。这可能意味着来自JoinBolt的大量输出元组和甚至更多的ACK用于下游 spout 发出。这可能会对消息传递系统造成重大压力,如果不小心,则会大大减缓 topology(拓扑)结构。要管理消息传递子系统上的负载,建议:

    • 增加 worker 堆(topology.worker.max.heap.size.mb)。
    • 如果您的 topology(拓扑)不需要ACK,则禁用ACKers(topology.acker.executors = 0)。
    • 禁用事件记录器(topology.eventlogger.executors = 0)。
    • 打开拓扑调试(topology.debug = false)。
    • 将topology.max.spout设置为一个足够大的值,以容纳估计的全窗口值的 tuple 加上一些更多的余量。这有助于减少在消息传递子系统遇到过载时发出过多元组的端口的可能性。当它的值设置为null时,可能会发生这种情况。
    • 最后,将窗口大小保持在解决手头问题所需的最小值。