如果对同一个 tuple ack 偶数次,会导致内存持续增长,使用 Storm 时应避免此类操作;下文为诊断过程。

storm 版本:1.1.0

  1. 使用 jcmd 命令导出 heapdump;
  2. 将 heapdump 导入 jvisualvm,分析最大的20个对象,为 clojure.lang.PersistentArrayMap;
  3. 进一步分析得知是 acker$mk_aker_bolt$reify__972 中的 pending.o._buckets;
  4. 对应的 storm 源码为:org/apache/storm/daemon/acker.clj 第 52 ~ 94 行;

第 57 行:

  1. (let [id (.getValue tuple 0) ; tuple 中取出 id

第59行:

  1. [curr (.get pending id) ; pending 中取出当前 tuple 的状态信息,放入 curr

第61~67行,针对不同阶段更新 tuple 状态

第62~64行,初始化阶段

  1. ((update-ack (.getValue tuple 1)) ; 更新 ack 校验码
  2. (assoc :spout-task (.getValue tuple 2)) ; 记录 spout-task
  3. (assoc :start-time (System/currentTimeMillis))) ; start-time

第65行,ack阶段:

  1. (update-ack curr (.getValue tuple 1)) ; 校验码归零
  2. ;; 注意 update-ack 的实现是 xor,初始值为 0,正常结束时 init ack 阶段相互抵消,值仍为 0
  3. ;; 如果多 ack一次,将导致结果非 0

第66行,fail阶段,第67行,超时阶段

第69~92行是后续处理:
70~77:正常结束(ack一次)

  1. (.remove pending id) ; pending 中移除 tuple 的状态信息

78~92:出错处理,也会移除状态信息

ack 的前逻辑 org/apache/storm/daemon/executor.clj 第 800~815 行

  1. (^void ack [this ^Tuple tuple]
  2. (let [^TupleImpl tuple tuple
  3. ack-val (.getAckVal tuple)]
  4. (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
  5. (task/send-unanchored task-data ; anchorsTolds 逐一发送
  6. ACKER-ACK-STREAM-ID
  7. [root (bit-xor id ack-val)])))
  8. (let [delta (tuple-time-delta! tuple)]
  9. (when debug?
  10. (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
  11. (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
  12. (when delta ; 更新 stats 相关
  13. (stats/bolt-acked-tuple! executor-stats
  14. (.getSourceComponent tuple)
  15. (.getSourceStreamId tuple)
  16. delta))))

对应 org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java 第120~144行

调用 pending.put() 的两处:
org/apache/storm/daemon/executor.clj#L580
org/apache/storm/daemon/acker.clj#L68

调用 pending.remove(id) 的两处:
org/apache/storm/daemon/executor.clj#L526
org/apache/storm/daemon/acker.clj#L72

正常流程
executor put
acker put [acker_init]
acker put [
acker_ack] 归零 + acker remove
executor remove

异常流程
executor put
acker put
acker put [__acker_init]
acker put [acker_ack] 归零 + acker remove
acker put [acker_ack] 未归零,导致无法被删除
executor remove

设计缺陷?

代码场景是:一条 tuple 衍生出多条 sql,每次执行 sql 后会 ack tuple,这样便引发了重复 ack。

与其说 storm 没有容忍重复 ack,不如说使用 storm 的方式不当。按照 storm 的理念,如果一条 tuple A 要衍生出多条 sql,应该额外创建一个 Bolt 来为这些 sql 生成 tuple B1, B2,然后分别 ack B1, B2,最后基于依赖关系,A 也可被正确地 ack。

参考

Acking framework implementation