消息丢失
13.1、Producer端如何保证消息不丢失
- 采取send()同步发消息,发送结果是同步感知的。
发送失败后可以重试,设置重试次数。默认3次。
producer.setRetryTimesWhenSendFailed(10);
集群部署,比如发送失败了的原因可能是当前Broker宕机了,重试的时候会发送到其他Broker上。
13.2、Broker端如何保证消息不丢失
修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。
flushDiskType = SYNC_FLUSH
-
13.3、Consumer端如何保证消息不丢失
完全消费正常后在进行手动ack确认。
从Producer分析:如何确保消息正确的发送到了Broker?
- 默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功
- 采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的。可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失
- RocketMQ支持 日志的索引,如果一条消息发送之后超时,也可以通过查询日志的API,来check是否在Broker存储成功
从Broker分析:如果确保接收到的消息不会丢失?
- 消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的
- Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中
- Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失
作者:黄靠谱 链接:https://www.jianshu.com/p/3213d8c29fd0
重复消费
影响消息正常发送和消费的重要原因是网络的不确定性。
引起重复消费的原因
- ACK
正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息已正常消费,从queue中剔除
当ack因为网络原因无法发送到broker,broker会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到consumer
- 消费模式
在CLUSTERING模式下,消息在broker中会保证相同group的consumer消费一次,但是针对不同group的consumer会推送多次
解决方案
- 数据库表
处理消息前,使用消息主键在表中带有约束的字段中insert
- Map
单机时可以使用map ConcurrentHashMap -> putIfAbsent guava cache
- Redis
分布式锁搞起来。
代码问题导致的重复消费问题案例
https://blog.csdn.net/weixin_41047933/article/details/87865920
12、如何让RocktMQ保证消息的顺序消费
你们线上业务用消息中间件的时候,是否需要保证消息的顺序性? 如果不需要保证消息顺序,为什么不需要?假如我有一个场景要保证消息的顺序,你们应该如何保证?
首先多个queue只能保证单个queue里的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是无法绝对保证消息的有序性的。所以总结如下:
同一topic,同一个QUEUE,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个queue里的消息。
追问:怎么保证消息发到同一个queue?
Rocket MQ给我们提供了MessageQueueSelector接口,可以自己重写里面的接口,实现自己的算法,举个最简单的例子:判断i % 2 == 0
,那就都放到queue1里,否则放到queue2里。
3、RockMQ如何保证消息不丢失首先在如下三个部分都可能会出现丢失消息的情况:
- Producer端
- Broker端
- Consumer端
14、rocketMQ的消息堆积如何处理
下游消费系统如果宕机了,导致几百万条消息在消息中间件里积压,此时怎么处理? 你们线上是否遇到过消息积压的生产故障?如果没遇到过,你考虑一下如何应对?
首先要找到是什么原因导致的消息堆积,是Producer太多了,Consumer太少了导致的还是说其他情况,总之先定位问题。
然后看下消息消费速度是否正常,正常的话,可以通过上线更多consumer临时解决消息堆积问题
追问:如果Consumer和Queue不对等,上线了多台也在短时间内无法消费完堆积的消息怎么办?
- 准备一个临时的topic
- queue的数量是堆积的几倍
- queue分布到多Broker中
- 上线一台Consumer做消息的搬运工,把原来Topic中的消息挪到新的Topic里,不做业务逻辑处理,只是挪过去
- 上线N台Consumer同时消费临时Topic中的数据
- 改bug
- 恢复原来的Consumer,继续消费之前的Topic
追问:堆积时间过长消息超时了?
RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。追问:堆积的消息会不会进死信队列?
不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),18次(默认18次,网上所有文章都说是16次,无一例外。但是我没搞懂为啥是16次,这不是18个时间吗 ?)才会进入死信队列(%DLQ%+ConsumerGroup)。