如果对同一个 tuple ack 偶数次,会导致内存持续增长,使用 Storm 时应避免此类操作;下文为诊断过程。
storm 版本:1.1.0
- 使用 jcmd 命令导出 heapdump;
- 将 heapdump 导入 jvisualvm,分析最大的20个对象,为 clojure.lang.PersistentArrayMap;
- 进一步分析得知是
acker$mk_aker_bolt$reify__972
中的 pending.o._buckets; - 对应的 storm 源码为:org/apache/storm/daemon/acker.clj 第 52 ~ 94 行;
第 57 行:
(let [id (.getValue tuple 0) ; 从 tuple 中取出 id
第59行:
[curr (.get pending id) ; 从 pending 中取出当前 tuple 的状态信息,放入 curr
第61~67行,针对不同阶段更新 tuple 状态
第62~64行,初始化阶段
((update-ack (.getValue tuple 1)) ; 更新 ack 校验码
(assoc :spout-task (.getValue tuple 2)) ; 记录 spout-task
(assoc :start-time (System/currentTimeMillis))) ; start-time
第65行,ack阶段:
(update-ack curr (.getValue tuple 1)) ; 校验码归零
;; 注意 update-ack 的实现是 xor,初始值为 0,正常结束时 init 与 ack 阶段相互抵消,值仍为 0
;; 如果多 ack一次,将导致结果非 0
第66行,fail阶段,第67行,超时阶段
第69~92行是后续处理:
70~77:正常结束(ack一次)
(.remove pending id) ; 从 pending 中移除 tuple 的状态信息
78~92:出错处理,也会移除状态信息
ack 的前逻辑 org/apache/storm/daemon/executor.clj 第 800~815 行
(^void ack [this ^Tuple tuple]
(let [^TupleImpl tuple tuple
ack-val (.getAckVal tuple)]
(fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
(task/send-unanchored task-data ; 将 anchorsTolds 逐一发送
ACKER-ACK-STREAM-ID
[root (bit-xor id ack-val)])))
(let [delta (tuple-time-delta! tuple)]
(when debug?
(log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
(when delta ; 更新 stats 相关
(stats/bolt-acked-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
对应 org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java 第120~144行
调用 pending.put() 的两处:
org/apache/storm/daemon/executor.clj#L580
org/apache/storm/daemon/acker.clj#L68
调用 pending.remove(id) 的两处:
org/apache/storm/daemon/executor.clj#L526
org/apache/storm/daemon/acker.clj#L72
正常流程
executor put
acker put [acker_init]
acker put [acker_ack] 归零 + acker remove
executor remove
异常流程
executor put
acker put
acker put [__acker_init]
acker put [acker_ack] 归零 + acker remove
acker put [acker_ack] 未归零,导致无法被删除
executor remove
设计缺陷?
代码场景是:一条 tuple 衍生出多条 sql,每次执行 sql 后会 ack tuple,这样便引发了重复 ack。
与其说 storm 没有容忍重复 ack,不如说使用 storm 的方式不当。按照 storm 的理念,如果一条 tuple A 要衍生出多条 sql,应该额外创建一个 Bolt 来为这些 sql 生成 tuple B1, B2,然后分别 ack B1, B2,最后基于依赖关系,A 也可被正确地 ack。