消息如何保障 100% 的投递成功?
幂等性概念详解
在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
RabbitMQ 投递消息的两种机制:Confirm 确认消息、Return 返回消息
如何自定义消费者?
消息的 ACK 签收与重回队列
消息的限流
TTL 消息
死信队列

1. 生产端的可靠性投递

1.1 什么是生产端的可靠性投递

  • 保障消息的成功发出
  • 保障 MQ 节点成功接受
  • 发送端收到 MQ 节点(Broker)确认应答(已收到)
  • 完善消息进行补偿机制

1.2 生产端可靠性投递的解决方案

通常,小规模的应用可以添加分布式事务保证数据源的一致性,但是在大规模的场景下一般不加事务,而是通过消息补偿机制来保障数据的一致性。

1.2.1 消息落库,对消息状态进行打标

深入 RabbitMQ 高级特性 - 图1
蓝色框:消息生产端
橘色框:RabbitMQ Server

消息持久化到数据库,对消息状态进行打标,如若消息未响应,进行轮询操作。

  • Step1:把业务消息落库 BIZ DB(业务信息数据库),再生成一条消息落库到 MSG DB 用来记录(譬如消息刚创建,正在发送中 status: 0)。(缺点:对数据库进行两次持久化)
  • Step2:生产端发送消息。
  • Step3:Broker 端收到后,应答至生产端。Confirm Listener 异步监听 Broker 的应答。
  • Step4:应答表明消息投递成功后,去 MSG DB 中抓取到指定的消息记录,更新状态,如 status: 1

  • Step5:如在 Step3 中出现网络不稳定等情况,导致 Listener 未收到消息成功确认的应答。那么消息数据库中的 status 就还是0,而 Broker 可能是接收到消息的状态。因此设定一个规则(定时任务),例如消息在落库5分钟后(超时)还是0的状态,就把该条记录抽取出来。

  • Step6:重新投递

  • Step7:限制一个重试的次数,譬如3次,如果大于3次,即为投递失败,更新 status 的值。(用补偿机制去查询消息失败的原因,人工)

:::info 第一种生产端可靠性投递方案,在高并发的场景下是否适合?

  • 在高并发的场景下,方案中数据和消息需要二次落库,这样数据库就会带来瓶颈

如果只将数据落库,消息不落库是不是就更适合高并发的场景呢?

  • 第二种方案就是这种设计,也是目前大多数大厂的方案,不能做到消息投递可靠性100%,却也能在极端的情况下达到 99.9% :::

1.2.2 消息的延迟投递,做二次确认,回调检查

深入 RabbitMQ 高级特性 - 图2
蓝色框 Upstream Service:消息生产端
红色框 Downstream Service:消息消费端
黄色框:RabbitMQ Server 消息集群
灰色框:Callback Service 回调服务

  • Step1:业务消息落库后,发送消息至 Broker。
  • Step2:紧接着(设置延迟时间或5分钟后)发送第二条延迟检查的消息。
  • Step3:消费端监听指定的队列接收到消息进行处理
  • Step4:处理完后,消费端自己生成一条响应消息发送到 Broker。
  • Step5:由 Callback 服务去监听该响应消息,收到该响应消息后持久化至 MSG DB(记录成功状态)。
  • Step6:到了延迟时间,延迟发送的消息也被 Callback 服务的监听器监听到后,去检查 MSG DB。如果未查询到成功的状态,Callback 服务需要做补偿,发起 RPC 通讯,让生产端重新发送。生产端通过介绍到的命令中所带的 id 去数据库查询该业务消息,再重新发送,即跳转到 Step1。

该方案减少了对数据库的存储,保证了性能。 :::info

  • 不加事务,事务会造成严重的性能瓶颈
  • 一定要业务数据持久化到数据库,再发消息
  • 延迟发送的 Delay Check Message 是投递给 Callback 服务的,用来确认消息是否投递成功 :::

2. 消费端的幂等性保障

2.1 幂等性的概念

通俗的说,就是执行 N 次操作的结果是相同的。

借鉴数据库的乐观锁机制。执行一条更新数据库的 SQL 语句:
(避免并发问题,添加一个版本号,执行过减操作后递增 version,就不会重复减)

  1. UPDATE T_REPS SET COUNT = COUNT - 1,VERSION = VERSION + 1
  2. WHERE VERSION = 1

2.2 消费端保障幂等性的解决方案

在海量订单产生的业务高峰期,如何避免消息的重复消费问题?

  • 消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的而消息

    2.2.1 唯一 ID + 指纹码 机制

  • 唯一ID + 指纹码(业务规则、时间戳等拼接)机制,利用数据库主键去重

  • SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指纹码,如未查询到就 insert,如有则说明已处理过该消息,返回失败
  • 优点:实现简单
  • 缺点:高并发下有数据库写入的性能瓶颈
    • 解决方案:依靠根据 ID 进行分库分表、算法路由的办法,将单库幂等拆分为多库幂等,进行分压分流

2.2.2 利用 Redis 原子特性实现

  • 使用 Redis 进行幂等,需要考虑的问题
    • 是否要落库数据库,如落库,数据库和缓存如何做到数据的一致性?
    • 如果不落库,数据存储在缓存中,如何设置定时同步的策略(可靠性保障)?

3. Confirm 确认消息机制

3.1 Confirm 消息确认机制的概念

  • 消息的确认,指生产者投递消息后,如果 Broker 收到消息,则会给生产者一个应答。
  • 生产者进行接收应答,用来确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心保障。

3.2 确认机制的流程图

深入 RabbitMQ 高级特性 - 图3
发送消息与监听应答的消息是异步操作。

3.3 Confirm 确认消息的实现

  • 第一步,在 channel 上开启消息确认的投递模式:channel.confirmSelect();
  • 在 channel 添加监听:channel.addConfirmListener(ConfirmListener listener),监听成功和失败的返回结果,根据具体结果对消息进行相应的处理(重新发送、记录日志等待后续处理等)

3.4 代码实现

消息生产端:/api/confirm/Producter

  1. public class Consumer {
  2. public static void main(String[] args) throws Exception{
  3. //1 创建ConnectionFactory
  4. //2 获取Connection
  5. //3 通过Connection创建一个新的Channel
  6. ...
  7. String exchangeName = "test_confirm_exchange";
  8. String routingKey = "confirm.#";
  9. String queueName = "test_confirm_queue";
  10. //4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
  11. channel.exchangeDeclare(exchangeName, "topic", true);
  12. channel.queueDeclare(queueName, true, false, false, null);
  13. channel.queueBind(queueName, exchangeName, routingKey);
  14. //5 创建消费者
  15. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  16. channel.basicConsume(queueName, true, queueingConsumer);
  17. while(true){
  18. Delivery delivery = queueingConsumer.nextDelivery();
  19. String msg = new String(delivery.getBody());
  20. System.err.println("消费端: " + msg);
  21. }
  22. }
  23. }

消息消费端:api/confirm/Consumer

  1. public class Producter {
  2. public static void main(String[] args) throws Exception{
  3. // 1 创建一个ConnectionFactory, 并进行配置
  4. //2 通过连接工厂创建连接
  5. //3 通过connection创建一个Channel
  6. ...
  7. //4 指定我们的消息投递模式: 消息的确认模式
  8. channel.confirmSelect();
  9. String exchangeName = "test_confirm_exchange";
  10. String routingKey = "confirm.save";
  11. //5 发送一条消息
  12. String msg = "Hello RabbitMQ Send confirm message!";
  13. channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
  14. //6 添加一个确认监听
  15. channel.addConfirmListener(new ConfirmListener() {
  16. @Override
  17. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  18. System.err.println("-------no ack!-----------");
  19. }
  20. @Override
  21. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  22. System.err.println("-------ack!-----------");
  23. }
  24. });
  25. }
  26. }

4. Return 消息机制

Return Listener 用于处理一些不可路由的消息!

4.1 基础 API

有一个关键配置项:

  • Mandatory:true,则监听器会接收到路由不可达的消息,然后进行处理;false,Broker 会自动删除该消息。默认是 false。

4.2 Return 消息机制流程

深入 RabbitMQ 高级特性 - 图4
我们的消息生产者,通过指定一个 Exchange 和 RoutingKey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作。

但是在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或指定的路由 key 路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用 Return Listener。

4.3 代码实现

消息生产端:/api/returnlistener/Producter

  1. public class Producter {
  2. public static void main(String[] args) throws Exception{
  3. //1 创建一个ConnectionFactory, 并进行配置
  4. //2 通过连接工厂创建连接
  5. //3 通过connection创建一个Channel
  6. ...
  7. String exchange = "test_return_exchange";
  8. String routingKey = "return.save";
  9. String routingKeyError = "abc.save";
  10. String msg = "Hello RabbitMQ Return Message";
  11. channel.addReturnListener(new ReturnListener() {
  12. @Override
  13. public void handleReturn(int replyCode, String replyText, String exchange,
  14. String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
  15. System.err.println("---------handle return----------");
  16. System.err.println("replyCode: " + replyCode);
  17. System.err.println("replyText: " + replyText);
  18. System.err.println("exchange: " + exchange);
  19. System.err.println("routingKey: " + routingKey);
  20. System.err.println("properties: " + properties);
  21. System.err.println("body: " + new String(body));
  22. }
  23. });
  24. channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
  25. }
  26. }

消息消费端:api/returnlistener/Consumer

  1. public class Consumer {
  2. public static void main(String[] args) throws Exception{
  3. ...
  4. String exchangeName = "test_return_exchange";
  5. String routingKey = "return.#";
  6. String queueName = "test_return_queue";
  7. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  8. channel.queueDeclare(queueName, true, false, false, null);
  9. channel.queueBind(queueName, exchangeName, routingKey);
  10. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  11. channel.basicConsume(queueName, true, queueingConsumer);
  12. while(true){
  13. Delivery delivery = queueingConsumer.nextDelivery();
  14. String msg = new String(delivery.getBody());
  15. System.err.println("消费者: " + msg);
  16. }
  17. }
  18. }

运行结果:

  1. ---------handle return----------
  2. replyCode: 312
  3. replyText: NO_ROUTE
  4. exchange: test_return_exchange
  5. routingKey: abc.save
  6. properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
  7. body: Hello RabbitMQ Return Message

5. 消费端自定义监听

我们一般就是在代码中编写 while 循环,进行 consumer.nextDelivery 方法进行获取下一条消息,然后进行消费处理。(不优雅)

我们使用自定义的 Consumer 更加的方便,解耦行更加的强,也是在实际工作中最常用的使用方式。

  • 自定义 Consumer 继承 DefaultConsumer;
  • 根据需求重写 DefaultConsumer 中的方法;

消息生产端:/api/consumer/Producter

  1. public class Producter {
  2. public static void main(String[] args) throws Exception{
  3. ...
  4. String exchange = "test_consumer_exchange";
  5. String routingKey = "consumer.save";
  6. String msg = "Hello RabbitMQ Consumer Message";
  7. for(int i =0; i<5; i ++){
  8. channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
  9. }
  10. }
  11. }

消息消费端:/api/consumer/Consumer

  1. public class Consumer {
  2. public static void main(String[] args) throws Exception{
  3. ...
  4. String exchangeName = "test_consumer_exchange";
  5. String routingKey = "consumer.#";
  6. String queueName = "test_consumer_queue";
  7. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  8. channel.queueDeclare(queueName, true, false, false, null);
  9. channel.queueBind(queueName, exchangeName, routingKey);
  10. channel.basicConsume(queueName, true, new MyConsumer(channel));
  11. }
  12. }

自定义 Consumer:/api/consumer/MyConsumer

  1. public class MyConsumer extends DefaultConsumer {
  2. public MyConsumer(Channel channel) {
  3. super(channel);
  4. }
  5. @Override
  6. public void handleDelivery(String consumerTag, Envelope envelope,
  7. AMQP.BasicProperties properties, byte[] body) throws IOException {
  8. System.err.println("-----------consume message----------");
  9. System.err.println("consumerTag: " + consumerTag);
  10. System.err.println("envelope: " + envelope);
  11. System.err.println("properties: " + properties);
  12. System.err.println("body: " + new String(body));
  13. }
  14. }

运行结果:

  1. -----------consume message----------
  2. consumerTag: amq.ctag-C4fNlgsOnxDxMA_GI4nWpQ
  3. envelope: Envelope(deliveryTag=1, redeliver=false, exchange=test_consumer_exchange, routingKey=consumer.save)
  4. properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
  5. body: Hello RabbitMQ Consumer Message
  6. -----------consume message----------
  7. consumerTag: amq.ctag-C4fNlgsOnxDxMA_GI4nWpQ
  8. envelope: Envelope(deliveryTag=2, redeliver=false, exchange=test_consumer_exchange, routingKey=consumer.save)
  9. properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
  10. body: Hello RabbitMQ Consumer Message
  11. ...
  12. ...

6. 消费端的限流机制

6.1 消费端限流的概念

当巨量消息瞬间全部推送时,单个客户端无法同时处理这些数据,服务器容易故障。因此要进行消费端限流。

RabbitMQ 提供了一种 Qos(服务质量保证)功能,即在非自动确认前提下,如果一定数目的消息未被确认前(通过 consume 或者 channel 设置 Qos 值),不进行消费新消息。

6.2 代码实现

  1. /**
  2. * Request specific "quality of service" settings.
  3. *
  4. * These settings impose limits on the amount of data the server
  5. * will deliver to consumers before requiring acknowledgements.
  6. * Thus they provide a means of consumer-initiated flow control.
  7. * @see com.rabbitmq.client.AMQP.Basic.Qos
  8. * @param prefetchSize maximum amount of content (measured in
  9. * octets) that the server will deliver, 0 if unlimited
  10. * @param prefetchCount maximum number of messages that the server
  11. * will deliver, 0 if unlimited
  12. * @param global true if the settings should be applied to the
  13. * entire channel rather than each consumer
  14. * @throws java.io.IOException if an error is encountered
  15. */
  16. void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
  • prefetchSize:消息限制大小,一般为0,不做限制。
  • prefetchCount:一次处理消息的个数,一般设置为1。
  • global:一般为 false。true,在 channel 级别做限制;false,在 consumer 级别做限制。

:::info

  • 在实际生产中,不要设置自动确认,要进行手动 ACK。 :::

消息生产端:/api/limit/Producter

  1. public class Producter {
  2. public static void main(String[] args) throws Exception {
  3. ...
  4. String exchange = "test_qos_exchange";
  5. String routingKey = "qos.save";
  6. String msg = "Hello RabbitMQ QOS Message";
  7. for(int i =0; i<5; i ++){
  8. channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
  9. }
  10. }
  11. }

消息消费端:/api/limit/Consumer

  1. public class Consumer {
  2. public static void main(String[] args) throws Exception {
  3. ...
  4. String exchangeName = "test_qos_exchange";
  5. String queueName = "test_qos_queue";
  6. String routingKey = "qos.#";
  7. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  8. channel.queueDeclare(queueName, true, false, false, null);
  9. channel.queueBind(queueName, exchangeName, routingKey);
  10. //1 限流方式 第一件事就是 autoAck设置为 false
  11. //int prefetchSize, int prefetchCount, boolean global
  12. channel.basicQos(0, 1, false);
  13. // String queue, boolean autoAck = false, Consumer callback
  14. channel.basicConsume(queueName, false, new MyConsumer(channel));
  15. }
  16. }

自定义 Consumer:/api/limit/MyConsumer

  1. public class MyConsumer extends DefaultConsumer {
  2. private Channel channel ;
  3. public MyConsumer(Channel channel) {
  4. super(channel);
  5. this.channel = channel;
  6. }
  7. @Override
  8. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  9. System.err.println("-----------consume message----------");
  10. System.err.println("consumerTag: " + consumerTag);
  11. System.err.println("envelope: " + envelope);
  12. System.err.println("properties: " + properties);
  13. System.err.println("body: " + new String(body));
  14. // long deliveryTag, boolean multiple
  15. channel.basicAck(envelope.getDeliveryTag(), false);
  16. }
  17. }

限流需要设置 channel.basicQos(0, 1, false);,关闭 autoAck,且需要在 MyConsumer 中手动签收。
在重写的 handleDelivery() 方法中,如果没有进行手动签收 channel.basicAck(),那么消费端在接收消息时,因为 prefetchCount 设置为 1,只会接收 1 条消息,剩下的消息的等待中,并不会被推送,直到手动 ack 后。

7. 消费端 ACK 与重回队列机制

7.1 消费端的手工 ACK 和 NACK

消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。

如果由于服务器宕机等严重问题,那我们就需要手工进行 ACK 来保障消费端消费成功。

7.2 消费端的重回队列

消费端重回队列是为了对没有处理成功的消息,把消息重新回递给 Broker。

一般我们在实际应用中,都会关闭重回队列,也就是设置为 False。

7.3 代码实现

消息生产端:/api/ack/Producter

  1. public class Producter {
  2. public static void main(String[] args) throws Exception {
  3. ...
  4. String exchange = "test_ack_exchange";
  5. String routingKey = "ack.save";
  6. for(int i =0; i<5; i ++){
  7. Map<String, Object> headers = new HashMap<String, Object>();
  8. headers.put("num", i);
  9. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  10. .deliveryMode(2)
  11. .contentEncoding("UTF-8")
  12. .headers(headers)
  13. .build();
  14. String msg = "Hello RabbitMQ ACK Message " + i;
  15. //String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body
  16. channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
  17. }
  18. }
  19. }

消息消费端:/api/ack/Consumer

  1. public class Consumer {
  2. public static void main(String[] args) throws Exception {
  3. ...
  4. String exchangeName = "test_ack_exchange";
  5. String queueName = "test_ack_queue";
  6. String routingKey = "ack.#";
  7. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  8. channel.queueDeclare(queueName, true, false, false, null);
  9. channel.queueBind(queueName, exchangeName, routingKey);
  10. // 手工签收 必须要关闭 autoAck = false
  11. channel.basicConsume(queueName, false, new MyConsumer(channel));
  12. }
  13. }

自定义Consumer:/api/ack/MyConsumer

  1. public class MyConsumer extends DefaultConsumer {
  2. private Channel channel ;
  3. public MyConsumer(Channel channel) {
  4. super(channel);
  5. this.channel = channel;
  6. }
  7. @Override
  8. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  9. System.err.println("-----------consume message----------");
  10. System.err.println("body: " + new String(body));
  11. try {
  12. Thread.sleep(2000);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. if((Integer)properties.getHeaders().get("num") == 0) {
  17. channel.basicNack(envelope.getDeliveryTag(), false, true);
  18. } else {
  19. channel.basicAck(envelope.getDeliveryTag(), false);
  20. }
  21. }
  22. }

运行结果:

  1. -----------consume message----------
  2. body: Hello RabbitMQ ACK Message 0
  3. -----------consume message----------
  4. body: Hello RabbitMQ ACK Message 1
  5. -----------consume message----------
  6. body: Hello RabbitMQ ACK Message 2
  7. -----------consume message----------
  8. body: Hello RabbitMQ ACK Message 3
  9. -----------consume message----------
  10. body: Hello RabbitMQ ACK Message 4
  11. -----------consume message----------
  12. body: Hello RabbitMQ ACK Message 0
  13. -----------consume message----------
  14. ...

:::info

  • 手动签收必须关闭 autoACK,channel.basicConsume(queueName, autoACK = false, …);
  • basicNack(long deliveryTag, boolean multiple, boolean requeue)**requeue = true** 时启动重回队列机制,即消息手动不签收 NACK 后,消费端将消息重新投递到 Broker,然后再次消费 :::

8. TTL 队列/消息

8.1 TTL 概念

TTL 是 Time to Live 的缩写,也就是生存时间;
RabbitMQ 支持消息的过期时间,在消息发送时可以进行指定;
RabbitMQ 支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除;

8.2 消息中设置 TTL

在代码中进行设置:

  1. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  2. .deliveryMode(2)
  3. .expiration("10000") //十秒后自动删除
  4. .build();

8.3 队列中设置 TTL

  • 可以在控制台中,声明队列时设置 TTL 时长:

深入 RabbitMQ 高级特性 - 图5

  • 声明交换机

深入 RabbitMQ 高级特性 - 图6

  • 添加绑定

深入 RabbitMQ 高级特性 - 图7
深入 RabbitMQ 高级特性 - 图8
深入 RabbitMQ 高级特性 - 图9

  • 发送消息

深入 RabbitMQ 高级特性 - 图10
深入 RabbitMQ 高级特性 - 图11
十秒后,因为 TTL 过期,消息消失。

9. 死信队列

9.1 死信队列概念

死信队列:DLX,Dead-Letter-Exchange;

利用 DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新 publish 到另一个 Exchange,这个 Exchange 就是 Dead-Letter-Exchange;

消息变成死信有以下几种情况:

  • 消息被拒绝(basic.reject/ basic.nack)并且 requeue=false(没有重回队列)
  • 消息 TTL 过期
  • 队列达到最大长度

DLX 也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列。

可以监听这个队列中消息做相应的处理。

9.2 死信队列的设置

  • 声明死信队列的 Exchange 和 Queue,然后进行绑定:
    • Exchange: dlx.exchange
    • Queue: dlx.queue
    • RoutingKey: #
  • 然后正常声明我们自己的 Exchange、Queue、Binding,只不过我们需要在队列上设置一个参数:arguments.put("x-dead-letter-exchange", "dlx.exchange");
  • 在消息过期、requeue、队列达到最大长度时(即为死信),消息会发送到指定的 dlx.exchange 交换机上,消费者会监听该交换机所绑定的死信队列。

9.3 代码实现

消息生产端:/api/dlx/Producter

  1. public class Producter {
  2. public static void main(String[] args) throws Exception {
  3. ...
  4. String exchange = "test_dlx_exchange";
  5. String routingKey = "dlx.save";
  6. String msg = "Hello RabbitMQ DLX Message";
  7. for(int i =0; i<1; i ++){
  8. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  9. .deliveryMode(2)
  10. .contentEncoding("UTF-8")
  11. .expiration("10000")
  12. .build();
  13. channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
  14. }
  15. }
  16. }

消息消费端:/api/dlx/Consumer

  1. public class Consumer {
  2. public static void main(String[] args) throws Exception {
  3. ...
  4. // 这就是一个普通的交换机 和 队列 以及路由
  5. String exchangeName = "test_dlx_exchange";
  6. String routingKey = "dlx.#";
  7. String queueName = "test_dlx_queue";
  8. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  9. Map<String, Object> agruments = new HashMap<String, Object>();
  10. agruments.put("x-dead-letter-exchange", "dlx.exchange");
  11. //这个agruments属性,要设置到声明队列上
  12. channel.queueDeclare(queueName, true, false, false, agruments);
  13. channel.queueBind(queueName, exchangeName, routingKey);
  14. //要进行死信队列的声明:
  15. channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
  16. channel.queueDeclare("dlx.queue", true, false, false, null);
  17. channel.queueBind("dlx.queue", "dlx.exchange", "#");
  18. channel.basicConsume(queueName, true, new MyConsumer(channel));
  19. }
  20. }

:::info

  • 先运行 Consumer,在 RabbitMQ Server 创建 Exchange、Queue、DLX Exchange、DLX Queue;
  • 关闭 Consumer;
  • 再运行 Producter,由于 Message Properties 中设置了超时时间,10秒后 Message 变成死信消息;
  • 死信消息被转发到 DLX Queue; :::