怎么解决消息的可靠性?

生产者确认机制
这种机制必须给每个消息指定唯一ID,发送到MQ之后 会返回结果给发送者,消息是否除了成功:
publisher-confirm,发送者确认
消息发送到交换机,返回ack
消息未投递到交换机,返回nack
publisher-return,发送者回执
消发送到交换机,但是没有路由到队列,返回ack及路由失败原因

  1. spring:
  2. rabbitmq:
  3. publisher-confirm-type: correlated # simple 同步 correlated 异步回调返回回调 ConfirmCallback
  4. publisher-returns: true #开启 publish-return功能
  5. template:
  6. mandatory: true #失败策略,true, 调用ReturnCallback ,false :则直接丢弃消息

需要定义配置类 继承ApplicationContextAware
设置回调确认方法

ConfirmCallback 这个方法是确定到达 路由返回消息

rabbitTemplate.setConfirmCallback( 发送消息的id 唯一标识 是否接收掉消息 失败原因
ConfirmCallback() 实现这个方法 confirm(CorrelationData correlationData, boolean ack, String cause)

ReturnCallback 这个方法是 确定到达消息队列 路由返回消息
/*
@param message 返还的消息内容
@param replyCode 回复状态码
@param replyText 回复内容
@param exchange 交换机
@param routingKey 路由
*/
returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)

mq持久化
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定

  1. //交换机持久化
  2. @Bean
  3. public DirectExchange simpleExchange(){
  4. // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
  5. return new DirectExchange("simple.direct", true, false);
  6. }
  7. //队列持久化
  8. @Bean
  9. public Queue simpleQueue(){
  10. // 使用QueueBuilder构建队列,durable就是持久化的
  11. return QueueBuilder.durable("simple.queue").build();
  12. }
  13. //消息持久化
  14. Message msg = MessageBuilder
  15. .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
  16. .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
  17. .build();

消费者确认机制

pringAMQP则允许配置三种确认模式
manual:手动 ack 需要代码结束后 调用api发送ack
auto: 自动 由spring检测到listener代码是否出现异常,出现异常返回ack,抛出异常返回nack
none: 关闭ack 模式下,消息投递是不可靠的,可能丢失
一般,都是使用默认的auto即可

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. acknowledge-mode: none # 关闭ack auto 自动 manual 手动

auto状态下 会不断的重试机制 无限循环

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. retry:
  6. enabled: true # 开启消费者失败重试
  7. initial-interval: 1000ms # 初识的失败等待时长为1秒
  8. multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
  9. max-attempts: 3 # 最大重试次数
  10. stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

达到最大重试机制之后 消息会被丢弃
失败策略:重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机 ★★★

比较优秀的处理方案是 RepublishMessageRecoverer 失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

什么是死信交换机?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消息被消费者reject或者返回nack
  • 超时未消费
  • 队列消息满了

死信交换机的使用场景:

  • 如果队列绑定了死信交换机,死信会投递到死信交换机;
  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

TTL
消息超时的两种方式:

  • 消息所在的队列设置了超时时间 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
  • 消息本身设置了超时时间 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
  • 失败重试机制

高可用性:
延迟消息问题:
消息堆积问题: