要想实现消息有序,需要从 Producer 和 Consumer 两方面来考虑。
如果对 Kafka 不了解的话,可以先看这篇博客《一文快速了解 Kafka》。
针对消息有序的业务需求,还分为全局有序和局部有序。
- 全局有序:一个 Topic 下的所有消息都需要按照生产顺序消费。
- 局部有序:一个 Topic 下的消息,只需要满足同一业务字段的要按照生产顺序消费。例如:Topic 消息是订单的流水表,包含订单 orderId,业务要求同一个 orderId 的消息需要按照生产顺序进行消费。
全局有序
由于 Kafka 的一个 Topic 可以分为了多个 Partition,Producer 发送消息的时候,是分散在不同 Partition 的。当 Producer 按顺序发消息给 Broker,但进入 Kafka 之后,这些消息就不一定进到哪个 Partition,会导致顺序是乱的。
因此要满足全局有序,需要 1 个 Topic 只能对应 1 个 Partition。
而且对应的 consumer 也要使用单线程或者保证消费顺序的线程模型,否则会出现下图所示,消费端造成的消费乱序。
局部有序
要满足局部有序,只需要在发消息的时候指定 Partition Key,Kafka 对其进行 Hash 计算,根据计算结果决定放入哪个 Partition。这样 Partition Key 相同的消息会放在同一个 Partition。此时,Partition 的数量仍然可以设置多个,提升 Topic 的整体吞吐量。
如下图所示,在不增加 partition 数量的情况下想提高消费速度,可以考虑再次 hash 唯一标识(例如订单 orderId)到不同的线程上,多个消费者线程并发处理消息(依旧可以保证局部有序)。
二、生产者在发送消息的时候指定要发送到哪个 Partition (分区)。
我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。
(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition
数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后
面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition
值,也就是常说的 round-robin 算法。
消息重试对顺序消息的影响
对于一个有着先后顺序的消息 A、B,正常情况下应该是 A 先发送完成后再发送 B,但是在异常情况下,在 A 发送失败的情况下,B 发送成功,而 A 由于重试机制在 B 发送完成之后重试发送成功了。这时对于本身顺序为 AB 的消息顺序变成了 BA。
针对这种问题,严格的顺序消费还需要 max.in.flight.requests.per.connection 参数的支持。
该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,同时也会提升吞吐量。把它设为 1 就可以保证消息是按照发送的顺序写入服务器的。
此外,对于某些业务场景,设置 max.in.flight.requests.per.connection=1 会严重降低吞吐量,如果放弃使用这种同步重试机制,则可以考虑在消费端增加失败标记的记录,然后用定时任务轮询去重试这些失败的消息并做好监控报警。