消息如何保障 100% 的投递成功?
幂等性概念详解
在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
RabbitMQ 投递消息的两种机制:Confirm 确认消息、Return 返回消息
如何自定义消费者?
消息的 ACK 签收与重回队列
消息的限流
TTL 消息
死信队列
1. 生产端的可靠性投递
1.1 什么是生产端的可靠性投递
- 保障消息的成功发出
- 保障 MQ 节点成功接受
- 发送端收到 MQ 节点(Broker)确认应答(已收到)
- 完善消息进行补偿机制
1.2 生产端可靠性投递的解决方案
通常,小规模的应用可以添加分布式事务保证数据源的一致性,但是在大规模的场景下一般不加事务,而是通过消息补偿机制来保障数据的一致性。
1.2.1 消息落库,对消息状态进行打标
蓝色框:消息生产端
橘色框:RabbitMQ Server
消息持久化到数据库,对消息状态进行打标,如若消息未响应,进行轮询操作。
- Step1:把业务消息落库 BIZ DB(业务信息数据库),再生成一条消息落库到 MSG DB 用来记录(譬如消息刚创建,正在发送中 status: 0)。(缺点:对数据库进行两次持久化)
- Step2:生产端发送消息。
- Step3:Broker 端收到后,应答至生产端。Confirm Listener 异步监听 Broker 的应答。
Step4:应答表明消息投递成功后,去 MSG DB 中抓取到指定的消息记录,更新状态,如 status: 1
Step5:如在 Step3 中出现网络不稳定等情况,导致 Listener 未收到消息成功确认的应答。那么消息数据库中的 status 就还是0,而 Broker 可能是接收到消息的状态。因此设定一个规则(定时任务),例如消息在落库5分钟后(超时)还是0的状态,就把该条记录抽取出来。
Step6:重新投递
- Step7:限制一个重试的次数,譬如3次,如果大于3次,即为投递失败,更新 status 的值。(用补偿机制去查询消息失败的原因,人工)
:::info 第一种生产端可靠性投递方案,在高并发的场景下是否适合?
- 在高并发的场景下,方案中数据和消息需要二次落库,这样数据库就会带来瓶颈
如果只将数据落库,消息不落库是不是就更适合高并发的场景呢?
- 第二种方案就是这种设计,也是目前大多数大厂的方案,不能做到消息投递可靠性100%,却也能在极端的情况下达到 99.9% :::
1.2.2 消息的延迟投递,做二次确认,回调检查
蓝色框 Upstream Service:消息生产端
红色框 Downstream Service:消息消费端
黄色框:RabbitMQ Server 消息集群
灰色框:Callback Service 回调服务
- Step1:业务消息落库后,发送消息至 Broker。
- Step2:紧接着(设置延迟时间或5分钟后)发送第二条延迟检查的消息。
- Step3:消费端监听指定的队列接收到消息进行处理
- Step4:处理完后,消费端自己生成一条响应消息发送到 Broker。
- Step5:由 Callback 服务去监听该响应消息,收到该响应消息后持久化至 MSG DB(记录成功状态)。
- Step6:到了延迟时间,延迟发送的消息也被 Callback 服务的监听器监听到后,去检查 MSG DB。如果未查询到成功的状态,Callback 服务需要做补偿,发起 RPC 通讯,让生产端重新发送。生产端通过介绍到的命令中所带的 id 去数据库查询该业务消息,再重新发送,即跳转到 Step1。
该方案减少了对数据库的存储,保证了性能。 :::info
- 不加事务,事务会造成严重的性能瓶颈
- 一定要业务数据持久化到数据库,再发消息
- 延迟发送的 Delay Check Message 是投递给 Callback 服务的,用来确认消息是否投递成功 :::
2. 消费端的幂等性保障
2.1 幂等性的概念
通俗的说,就是执行 N 次操作的结果是相同的。
借鉴数据库的乐观锁机制。执行一条更新数据库的 SQL 语句:
(避免并发问题,添加一个版本号,执行过减操作后递增 version,就不会重复减)
UPDATE T_REPS SET COUNT = COUNT - 1,VERSION = VERSION + 1
WHERE VERSION = 1
2.2 消费端保障幂等性的解决方案
在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的而消息
2.2.1 唯一 ID + 指纹码 机制
唯一ID + 指纹码(业务规则、时间戳等拼接)机制,利用数据库主键去重
SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指纹码
,如未查询到就 insert,如有则说明已处理过该消息,返回失败- 优点:实现简单
- 缺点:高并发下有数据库写入的性能瓶颈
- 解决方案:依靠根据 ID 进行分库分表、算法路由的办法,将单库幂等拆分为多库幂等,进行分压分流
2.2.2 利用 Redis 原子特性实现
- 使用 Redis 进行幂等,需要考虑的问题
- 是否要落库数据库,如落库,数据库和缓存如何做到数据的一致性?
- 如果不落库,数据存储在缓存中,如何设置定时同步的策略(可靠性保障)?
3. Confirm 确认消息机制
3.1 Confirm 消息确认机制的概念
- 消息的确认,指生产者投递消息后,如果 Broker 收到消息,则会给生产者一个应答。
- 生产者进行接收应答,用来确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心保障。
3.2 确认机制的流程图
发送消息与监听应答的消息是异步操作。
3.3 Confirm 确认消息的实现
- 第一步,在 channel 上开启消息确认的投递模式:
channel.confirmSelect()
; - 在 channel 添加监听:
channel.addConfirmListener(ConfirmListener listener)
,监听成功和失败的返回结果,根据具体结果对消息进行相应的处理(重新发送、记录日志等待后续处理等)
3.4 代码实现
消息生产端:/api/confirm/Producter
public class Consumer {
public static void main(String[] args) throws Exception{
//1 创建ConnectionFactory
//2 获取Connection
//3 通过Connection创建一个新的Channel
...
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#";
String queueName = "test_confirm_queue";
//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
}
}
}
消息消费端:api/confirm/Consumer
public class Producter {
public static void main(String[] args) throws Exception{
// 1 创建一个ConnectionFactory, 并进行配置
//2 通过连接工厂创建连接
//3 通过connection创建一个Channel
...
//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
}
}
4. Return 消息机制
Return Listener 用于处理一些不可路由的消息!
4.1 基础 API
有一个关键配置项:
Mandatory
:true,则监听器会接收到路由不可达的消息,然后进行处理;false,Broker 会自动删除该消息。默认是 false。
4.2 Return 消息机制流程
我们的消息生产者,通过指定一个 Exchange 和 RoutingKey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作。
但是在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或指定的路由 key 路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用 Return Listener。
4.3 代码实现
消息生产端:/api/returnlistener/Producter
public class Producter {
public static void main(String[] args) throws Exception{
//1 创建一个ConnectionFactory, 并进行配置
//2 通过连接工厂创建连接
//3 通过connection创建一个Channel
...
String exchange = "test_return_exchange";
String routingKey = "return.save";
String routingKeyError = "abc.save";
String msg = "Hello RabbitMQ Return Message";
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
}
}
消息消费端:api/returnlistener/Consumer
public class Consumer {
public static void main(String[] args) throws Exception{
...
String exchangeName = "test_return_exchange";
String routingKey = "return.#";
String queueName = "test_return_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费者: " + msg);
}
}
}
运行结果:
---------handle return----------
replyCode: 312
replyText: NO_ROUTE
exchange: test_return_exchange
routingKey: abc.save
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: Hello RabbitMQ Return Message
5. 消费端自定义监听
我们一般就是在代码中编写 while 循环,进行 consumer.nextDelivery 方法进行获取下一条消息,然后进行消费处理。(不优雅)
我们使用自定义的 Consumer 更加的方便,解耦行更加的强,也是在实际工作中最常用的使用方式。
- 自定义 Consumer 继承 DefaultConsumer;
- 根据需求重写 DefaultConsumer 中的方法;
消息生产端:/api/consumer/Producter
public class Producter {
public static void main(String[] args) throws Exception{
...
String exchange = "test_consumer_exchange";
String routingKey = "consumer.save";
String msg = "Hello RabbitMQ Consumer Message";
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
消息消费端:/api/consumer/Consumer
public class Consumer {
public static void main(String[] args) throws Exception{
...
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.#";
String queueName = "test_consumer_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
自定义 Consumer:/api/consumer/MyConsumer
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
运行结果:
-----------consume message----------
consumerTag: amq.ctag-C4fNlgsOnxDxMA_GI4nWpQ
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=test_consumer_exchange, routingKey=consumer.save)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: Hello RabbitMQ Consumer Message
-----------consume message----------
consumerTag: amq.ctag-C4fNlgsOnxDxMA_GI4nWpQ
envelope: Envelope(deliveryTag=2, redeliver=false, exchange=test_consumer_exchange, routingKey=consumer.save)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: Hello RabbitMQ Consumer Message
...
...
6. 消费端的限流机制
6.1 消费端限流的概念
当巨量消息瞬间全部推送时,单个客户端无法同时处理这些数据,服务器容易故障。因此要进行消费端限流。
RabbitMQ 提供了一种 Qos(服务质量保证)功能,即在非自动确认前提下,如果一定数目的消息未被确认前(通过 consume 或者 channel 设置 Qos 值),不进行消费新消息。
6.2 代码实现
/**
* Request specific "quality of service" settings.
*
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @see com.rabbitmq.client.AMQP.Basic.Qos
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
prefetchSize
:消息限制大小,一般为0,不做限制。prefetchCount
:一次处理消息的个数,一般设置为1。global
:一般为 false。true,在 channel 级别做限制;false,在 consumer 级别做限制。
:::info
- 在实际生产中,不要设置自动确认,要进行手动 ACK。 :::
消息生产端:/api/limit/Producter
public class Producter {
public static void main(String[] args) throws Exception {
...
String exchange = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello RabbitMQ QOS Message";
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
消息消费端:/api/limit/Consumer
public class Consumer {
public static void main(String[] args) throws Exception {
...
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//1 限流方式 第一件事就是 autoAck设置为 false
//int prefetchSize, int prefetchCount, boolean global
channel.basicQos(0, 1, false);
// String queue, boolean autoAck = false, Consumer callback
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
自定义 Consumer:/api/limit/MyConsumer
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
// long deliveryTag, boolean multiple
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
限流需要设置 channel.basicQos(0, 1, false);
,关闭 autoAck,且需要在 MyConsumer 中手动签收。
在重写的 handleDelivery()
方法中,如果没有进行手动签收 channel.basicAck()
,那么消费端在接收消息时,因为 prefetchCount
设置为 1,只会接收 1 条消息,剩下的消息的等待中,并不会被推送,直到手动 ack 后。
7. 消费端 ACK 与重回队列机制
7.1 消费端的手工 ACK 和 NACK
消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。
如果由于服务器宕机等严重问题,那我们就需要手工进行 ACK 来保障消费端消费成功。
7.2 消费端的重回队列
消费端重回队列是为了对没有处理成功的消息,把消息重新回递给 Broker。
一般我们在实际应用中,都会关闭重回队列,也就是设置为 False。
7.3 代码实现
消息生产端:/api/ack/Producter
public class Producter {
public static void main(String[] args) throws Exception {
...
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i<5; i ++){
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
String msg = "Hello RabbitMQ ACK Message " + i;
//String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
消息消费端:/api/ack/Consumer
public class Consumer {
public static void main(String[] args) throws Exception {
...
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 手工签收 必须要关闭 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
自定义Consumer:/api/ack/MyConsumer
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
运行结果:
-----------consume message----------
body: Hello RabbitMQ ACK Message 0
-----------consume message----------
body: Hello RabbitMQ ACK Message 1
-----------consume message----------
body: Hello RabbitMQ ACK Message 2
-----------consume message----------
body: Hello RabbitMQ ACK Message 3
-----------consume message----------
body: Hello RabbitMQ ACK Message 4
-----------consume message----------
body: Hello RabbitMQ ACK Message 0
-----------consume message----------
...
:::info
- 手动签收必须关闭 autoACK,channel.basicConsume(queueName, autoACK = false, …);
basicNack(long deliveryTag, boolean multiple, boolean requeue)
,**requeue = true**
时启动重回队列机制,即消息手动不签收 NACK 后,消费端将消息重新投递到 Broker,然后再次消费 :::
8. TTL 队列/消息
8.1 TTL 概念
TTL 是 Time to Live 的缩写,也就是生存时间;
RabbitMQ 支持消息的过期时间,在消息发送时可以进行指定;
RabbitMQ 支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除;
8.2 消息中设置 TTL
在代码中进行设置:
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.expiration("10000") //十秒后自动删除
.build();
8.3 队列中设置 TTL
- 可以在控制台中,声明队列时设置 TTL 时长:
- 声明交换机
- 添加绑定
- 发送消息:
十秒后,因为 TTL 过期,消息消失。
9. 死信队列
9.1 死信队列概念
死信队列:DLX,Dead-Letter-Exchange;
利用 DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新 publish 到另一个 Exchange,这个 Exchange 就是 Dead-Letter-Exchange;
消息变成死信有以下几种情况:
- 消息被拒绝(
basic.reject/ basic.nack
)并且requeue=false
(没有重回队列) - 消息 TTL 过期
- 队列达到最大长度
DLX 也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理。
9.2 死信队列的设置
- 声明死信队列的 Exchange 和 Queue,然后进行绑定:
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: #
- 然后正常声明我们自己的 Exchange、Queue、Binding,只不过我们需要在队列上设置一个参数:
arguments.put("x-dead-letter-exchange", "dlx.exchange");
- 在消息过期、requeue、队列达到最大长度时(即为死信),消息会发送到指定的 dlx.exchange 交换机上,消费者会监听该交换机所绑定的死信队列。
9.3 代码实现
消息生产端:/api/dlx/Producter
public class Producter {
public static void main(String[] args) throws Exception {
...
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello RabbitMQ DLX Message";
for(int i =0; i<1; i ++){
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
消息消费端:/api/dlx/Consumer
public class Consumer {
public static void main(String[] args) throws Exception {
...
// 这就是一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要进行死信队列的声明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
:::info
- 先运行 Consumer,在 RabbitMQ Server 创建 Exchange、Queue、DLX Exchange、DLX Queue;
- 关闭 Consumer;
- 再运行 Producter,由于 Message Properties 中设置了超时时间,10秒后 Message 变成死信消息;
- 死信消息被转发到 DLX Queue; :::