怎么解决消息的可靠性?
生产者确认机制
这种机制必须给每个消息指定唯一ID,发送到MQ之后 会返回结果给发送者,消息是否除了成功:
publisher-confirm,发送者确认
消息发送到交换机,返回ack
消息未投递到交换机,返回nack
publisher-return,发送者回执
消发送到交换机,但是没有路由到队列,返回ack及路由失败原因
spring:
rabbitmq:
publisher-confirm-type: correlated # simple 同步 correlated 异步回调返回回调 ConfirmCallback
publisher-returns: true #开启 publish-return功能
template:
mandatory: true #失败策略,true, 调用ReturnCallback ,false :则直接丢弃消息
需要定义配置类 继承ApplicationContextAware
设置回调确认方法
ConfirmCallback 这个方法是确定到达 路由返回消息
rabbitTemplate.setConfirmCallback( 发送消息的id 唯一标识 是否接收掉消息 失败原因
ConfirmCallback() 实现这个方法 confirm(CorrelationData correlationData, boolean ack, String cause)
)
ReturnCallback 这个方法是 确定到达消息队列 路由返回消息
/*
@param message 返还的消息内容
@param replyCode 回复状态码
@param replyText 回复内容
@param exchange 交换机
@param routingKey 路由
*/
returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
mq持久化
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定
//交换机持久化
@Bean
public DirectExchange simpleExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}
//队列持久化
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
//消息持久化
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
.build();
消费者确认机制
pringAMQP则允许配置三种确认模式
manual:手动 ack 需要代码结束后 调用api发送ack
auto: 自动 由spring检测到listener代码是否出现异常,出现异常返回ack,抛出异常返回nack
none: 关闭ack 模式下,消息投递是不可靠的,可能丢失
一般,都是使用默认的auto即可
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 关闭ack auto 自动 manual 手动
auto状态下 会不断的重试机制 无限循环
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
达到最大重试机制之后 消息会被丢弃
失败策略:重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机 ★★★
比较优秀的处理方案是 RepublishMessageRecoverer 失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
什么是死信交换机?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消息被消费者reject或者返回nack
- 超时未消费
- 队列消息满了
死信交换机的使用场景:
- 如果队列绑定了死信交换机,死信会投递到死信交换机;
- 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。
TTL
消息超时的两种方式:
- 消息所在的队列设置了超时时间 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
- 消息本身设置了超时时间 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
- 失败重试机制
高可用性:
延迟消息问题:
消息堆积问题: