Spring提供了整合Rabbit的依赖。

Spring 整合RabbitMQ

生产端工程

1. 添加依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.amqp</groupId>
  4. <artifactId>spring-rabbit</artifactId>
  5. <version>2.0.1.RELEASE</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.slf4j</groupId>
  9. <artifactId>slf4j-log4j12</artifactId>
  10. <version>1.7.25</version>
  11. <scope>compile</scope>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.commons</groupId>
  15. <artifactId>commons-lang3</artifactId>
  16. <version>3.9</version>
  17. </dependency>
  18. </dependencies>

2. spring-rabbitmq-producer.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:context="http://www.springframework.org/schema/context"
  4. xmlns:tx="http://www.springframework.org/schema/tx"
  5. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  6. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  7. xsi:schemaLocation="http://www.springframework.org/schema/beans
  8. http://www.springframework.org/schema/beans/spring-beans.xsd
  9. http://www.springframework.org/schema/context
  10. http://www.springframework.org/schema/context/spring-context.xsd
  11. http://www.springframework.org/schema/tx
  12. http://www.springframework.org/schema/tx/spring-tx.xsd
  13. http://www.springframework.org/schema/rabbit
  14. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  15. ">
  16. <!-- 1.配置连接Rabbit工厂 -->
  17. <rabbit:connection-factory id="connectionFactory" host="172.16.150.130" port="5672" username="prim"
  18. password="123456" virtual-host="/edu"/>
  19. <!-- 2. 配置队列 -->
  20. <rabbit:queue name="test_spring_queue_1"/>
  21. <!-- 3. 配置RabbitAdmin:主要用于在Java代码中对队列的管理,用来创建、绑定、删除队列与交换机,发送消息等操作 -->
  22. <rabbit:admin connection-factory="connectionFactory"/>
  23. <!-- 4. 配置交换机/路由 topic类型 -->
  24. <rabbit:topic-exchange name="spring_topic_exchange">
  25. <!-- 绑定队列 -->
  26. <rabbit:bindings>
  27. <!-- pattern通配符 -->
  28. <rabbit:binding pattern="msg.#" queue="test_spring_queue_1"/>
  29. </rabbit:bindings>
  30. </rabbit:topic-exchange>
  31. <!-- 5. 配置json转换的工具:将消息转换成json的工具 -->
  32. <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
  33. <!-- 6. 配置RabbitMQ模板 -->
  34. <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange"
  35. message-converter="jsonMessageConverter"/>
  36. </beans>

3. 发送消息

    public static void main(String[] args) {
        //1. 创建spring容器
        ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
        //2. 从容器中获得Rabbit模板对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //3. 发送消息
        Map<String, String> map = new HashMap<>();
        map.put("name", "prim");
        map.put("email", "111@qq.com");
        rabbitTemplate.convertAndSend("msg.user", map);
        applicationContext.close();
    }

消费端工程

创建maven,依赖和生产端工程一致。

1. 消费端配置

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx.xsd
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
">
    <!--  1.配置连接Rabbit工厂   -->
    <rabbit:connection-factory id="connectionFactory" host="172.16.150.130" port="5672" username="prim"
                               password="123456" virtual-host="/edu"/>
    <!--  2. 配置队列  -->
    <rabbit:queue name="test_spring_queue_1"/>
    <!--  3. 配置RabbitAdmin:主要用于在Java代码中对队列的管理,用来创建、绑定、删除队列与交换机,发送消息等操作  -->
    <rabbit:admin connection-factory="connectionFactory"/>
    <!--  4. 注解扫描 -->
    <context:component-scan base-package="listener"/>
    <!--  5. 配置监听 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/>
    </rabbit:listener-container>

</beans>

2. 添加监听类

/**
 * @program: spring-rabbit-consumer
 * @Description: 消费者队列
 * @author: sufulu
 * @version: 1.0.0
 * @create: 2021-02-02 22:36
 * @PackageName: listener
 * @ClassName: ConsumerListener.java
 **/
@Component
public class ConsumerListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
// 将Message对象转换成json
        try {
            JsonNode jsonNode = MAPPER.readTree(message.getBody());
            String name = jsonNode.get("name").asText();
            String email = jsonNode.get("email").asText();
            System.out.println("从队列中获取:" + name + "的邮箱是:" + email);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

3. 编写测试类:

加载spring容器,让程序一直运行,监听生产端的消息

public class TestRunner {
    public static void main(String[] args) throws IOException {
        ClassPathXmlApplicationContext applicationContext =
                new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml");
        System.in.read();//程序一直运行
    }
}

生产端,运行发送消息,查看消费端是否可以接收到消息?
消费端可以接收到消息,这样我们就实现了两个工程之间的通信
image.png

消息成功确认机制

在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?

  • 事务机制
  • 发布确认(推荐使用)

事务机制

  • AMQP协议提供的一种保证消息成功投递的方式,通过信道开启transactional模式
  • 并利用信道的三个方法来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递
    • channel.txSelect()开启事务
    • channel.txCommit()提交事务
    • channel.txRollback()回滚事务

示例代码如下:

  • 生产者 ```java public class Sender { public static void main(String[] args) throws Exception {
      Connection connection = ConnectionUtils.getConnection();
      Channel channel = connection.createChannel();
    
    // channel.queueDeclare(“test_work_queue”, false, false, false, null);
      //声明路由,创建网红主播
      //第一个参数:路由名称
      //第二个参数:路由类型 一共有四种。
      //第三个参数:表示否是持久化路由
      //direct:根据路由键进行定向分发消息
      //topic:模糊匹配的定向分发
      channel.exchangeDeclare("text_transaction_topic", "topic");
      try {
          //开启事务
          channel.txSelect();
          //推消息到路由器
          //第二个参数必填,路由键
          channel.basicPublish("text_transaction_topic", "order.down", null, "商品1-降价".getBytes());
          int i = 1 / 0;//出现了异常会导致:成功的成功,失败的失败,加入事务要么全部成功,要么全部失败
          channel.basicPublish("text_transaction_topic", "order.down", null, "商品2-降价".getBytes());
          //提交事务
          channel.txCommit();
          System.out.println("[消息已发送]");
      } catch (IOException e) {
          System.out.println("消息全部撤销");
          //回滚事务
          channel.txRollback();
      } finally {
          //5. 释放资源
          channel.close();
          connection.close();
      }
    
    } }

- 消费者
```java
public class Recer1 {
    public static void main(String[] args) throws Exception {
        //1. 获得连接
        Connection connection = ConnectionUtils.getConnection();
        //2. 获得通道
        Channel channel = connection.createChannel();
        //queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
        //声明队列
        channel.queueDeclare("test_transaction_queue", false, false, false, null);
        /**
         * 绑定用户相关的消息:user.#
         * 参数1:队列名
         * 参数2:路由名
         * 参数3:路由键
         */
        channel.queueBind("test_transaction_queue", "text_transaction_topic", "order.#");
        //3. 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("【消费者1】:" + s);
            }
        };
        //4. 监听队列
        //第二个参数:true 表示自动消息确认;false 手动确认消息
        channel.basicConsume("test_transaction_queue", true, consumer);
    }
}

Confirm发布确认机制

  • RabbitMQ为了保证消息的成功投递,采用通过AMQP协议层面为我们提供事务机制的方案,但是采用事务会大大降低消息的吞吐量
  • 事务效率低,为什么呢?假如10条消息,前9条成功了,如果第10条失败了,那么9条消息要全部撤销回滚,太浪费了
  • Confirm模式则采用补发第10条消息的措施来完成10条消息的送达
  1. 在spring中的应用:spring-rabbitmq-producer.xml 添加publisher-confirms="true" 启动生产者确认机制

    <!--  1.配置连接Rabbit工厂   -->
     <rabbit:connection-factory 
                                id="connectionFactory" 
                                host="172.16.150.130" 
                                port="5672" 
                                username="prim"
                                password="123456" 
                                virtual-host="/edu" 
                                publisher-confirms="true"/>
     <!--  2. 配置队列  -->
     <rabbit:queue name="test_spring_queue_1"/>
     <!--  3. 配置RabbitAdmin:主要用于在Java代码中对队列的管理,用来创建、绑定、删除队列与交换机,发送消息等操作  -->
     <rabbit:admin connection-factory="connectionFactory"/>
     <!--  4. 配置交换机/路由 topic类型 -->
     <rabbit:topic-exchange name="spring_topic_exchange">
         <!-- 绑定队列 -->
         <rabbit:bindings>
             <!-- pattern通配符 -->
             <rabbit:binding pattern="msg.#" queue="test_spring_queue_1"/>
         </rabbit:bindings>
     </rabbit:topic-exchange>
     <!-- 5. 配置json转换的工具:将消息转换成json的工具 -->
     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
     <!-- 6. 配置RabbitMQ模板 -->
     <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange"
                      message-converter="jsonMessageConverter"
                      confirm-callback="messageConfirm"/>
     <!-- 7. 确认机制处理类 -->
     <bean id="messageConfirm" class="confirm.MessageConfirm"/>
    
  2. 消息确认处理类

    /**
    * @program: spring-rabbitmq-producer
    * @Description: 消息确认处理
    * @author: sufulu
    * @version: 1.0.0
    * @create: 2021-02-02 23:48
    * @PackageName: confirm
    * @ClassName: MessageConfirm.java
    **/
    public class MessageConfirm implements RabbitTemplate.ConfirmCallback {
     /***
      * 
      * @param correlationData 消息相关的数据对象,封装了消息的唯一id
      * @param b 消息是否确认成功
      * @param s 异常信息
      */
     @Override
     public void confirm(CorrelationData correlationData, boolean b, String s) {
         if (b) {
             System.out.println("消息确认成功");
         } else {
             System.out.println("消息确认失败xxxx");
             //补发 失败的消息,如果本条消息一定要发送到队列中,例如下订单消息,我们可以采用补发
             //1. 采用递归 限制递归的次数
    
             //2. redis+定时任务(JDK的timer,或者定时任务框架Quartz)
         }
     }
    }
    
  3. 测试发送失败消息,如下代码,发送到一个不存在的路由,那么消息肯定会失败

    public class Sender {
     public static void main(String[] args) {
         //1. 创建spring容器
         ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
         //2. 从容器中获得Rabbit模板对象
         RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
         //3. 发送消息
         Map<String, String> map = new HashMap<>();
         map.put("name", "吕布");
         map.put("email", "666@qq.com");
         //将消息发送一个不存在的路由 测试消息发送失败
         rabbitTemplate.convertAndSend("lalal","msg.user", map);
         System.out.println("[消息已发出]");
         applicationContext.close();
     }
    }
    

    image.png
    我们可以在消息确认类中补发消息:
    补发 失败的消息,如果本条消息一定要发送到队列中,例如下订单消息,我们可以采用补发
    1. 采用递归 限制递归的次数
    2. redis+定时任务(JDK的timer,或者定时任务框架Quartz)

消费端限流

在RabbitMQ服务器积压了成千上万条未处理的消息,然后随便打开一个消费者客户端,就会出现:巨量的消息瞬间全部喷涌推送过来,但是单个客户端无法通知处理这么多数据就会被压垮崩溃。 所以在数据量特别大的时候,我们对生产端限流肯定不行的,因为有时候并发量特别大,有时候特别小,这是用户行为,无法约束的,所以应该对消费端限流,用于保持消费端的稳定。

RabbitMQ提供了一种Qos(Quality of Service 服务质量)服务质量保证功能。
即在非自动确认消息的前提下,如果一定数据的消息未被确认前,不再进行消费新的消息。

  • 生产者循环发出多条消息

    for (int i = 0; i < 10; i++) {
              rabbitTemplate.convertAndSend("msg.user", map);
    }
    

    运行生产端,积压了10条消息
    image.png

  • 消费端进行限流处理

消费端配置:

    <!--  5. 配置监听
          acknowledge="manual" 手动确认
          prefetch="3" 一次性消费的消息数量,会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,一但有N个消息 还没有ack
          则Consumer将阻塞直到消息被ack
    -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="3">
        <rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/>
    </rabbit:listener-container>

消费端监听类:

@Component
public class ConsumerListener extends AbstractAdaptableMessageListener {
    //jackson提供序列化和反序列化中使用最多的类,转换json的
    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        // 将Message对象转换成json
        try {
            JsonNode jsonNode = MAPPER.readTree(message.getBody());
            String name = jsonNode.get("name").asText();
            String email = jsonNode.get("email").asText();
            System.out.println("从队列中获取:" + name + "的邮箱是:" + email);
            //手动确认消息
            //参数1:RabbitMQ向该channel投递的这条消息的唯一标识id,此id是一个单调递增的正整数
            //参数2:为了减少网络流量,手动确认可以被批量处理。当为true时,则可以一次性确认小于等于messageId值的所有消息
            //加入messageId=8 则可以确认<=8的所有消息
            long messageId = message.getMessageProperties().getDeliveryTag();
            System.out.println("messageId:" + messageId);
            channel.basicAck(messageId, true);
            Thread.sleep(3000);
            System.out.println("休息三秒后再继续接收消息");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

过期时间TTL

Time To Live:生存时间,单位:毫秒,如果超过了这个时间,则自动删除(被称为dead message 并投入到死信队列,无法消费该消息) RabbitMQ可以对消息和队列设置TTL

  • 通过队列设置,队列中所有都有相同的过期时间
  • 对消息单独设置,每条消息的TTL可以不同(更颗粒化)

设置队列TTL

在生产端配置:

    <!--  2. 配置队列  -->
    <!--    <rabbit:queue name="test_spring_queue_1"/>-->
    <rabbit:queue name="test_spring_queue_1" auto-declare="true">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl" value-type="long" value="5000"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

image.png
等待5S之后:消息自动删除了
image.png

设置消息TTL

队列配置

<rabbit:queue name="test_spring_queue_ttl_2"/>

发送消息的设置:

    public static void main(String[] args) {
        ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //创建消息的配置对象
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("3000");//设置过期时间三秒
        //创建消息
        Message message = new Message("测试过期时间".getBytes(), properties);
        rabbitTemplate.convertAndSend("msg.user", message);
        System.out.println("[消息已发出]");
        applicationContext.close();
    }

死信队列

DLX 死信交换机/死信邮箱,当消息在队列中由于某些原因没有被及时消费而变成死信后,这些消息就会被分发到DLX交换机中,而绑定DLX交换机的队列,称之为:死信队列。

消息没有被及时消费的原因:

  • 消息被拒绝(basic.reject/basic.nack)并且不再重新投递 requeue=false
  • 消息超时未消费
  • 达到最大队列长度

image.png
代码示例:
spring/spring-rabbitmq-producer2.xml

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx.xsd
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
">
    <rabbit:connection-factory
            id="connectionFactory"
            host="172.16.150.130"
            port="5672"
            username="prim"
            password="123456"
            virtual-host="/edu"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

    <!-- 死信的相关配置 -->

    <!-- 定义死信队列 -->
    <rabbit:queue name="dlx_queue"/>
    <!-- 定向的死信交换机 -->
    <rabbit:direct-exchange name="dle_exchange">
        <rabbit:bindings>
            <rabbit:binding key="dlx_ttl" queue="dlx_queue"/>
            <rabbit:binding key="dlx_max" queue="dlx_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 声明过期时间的消息队列 -->
    <rabbit:queue name="test_ttl_queue">
        <rabbit:queue-arguments>
            <!-- 设置队列的过期时间 -->
            <entry key="x-message-ttl" value-type="long" value="6000"/>
            <!-- 消息如果超时,将消息投递给死信交换机 -->
            <entry key="x-dead-letter-exchange" value="dle_exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!-- 声明固定长度的消息队列 -->
    <rabbit:queue name="test_max_queue">
        <rabbit:queue-arguments>
            <!-- 设置队列的长度,最多装2个消息 -->
            <entry key="x-max-length" value-type="long" value="2"/>
            <!-- 消息如果超出长度,将消息投递给死信交换机 -->
            <entry key="x-dead-letter-exchange" value="dle_exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!-- 声明定向的测试消息的交换机 -->
    <rabbit:direct-exchange name="my_exchange">
        <rabbit:bindings>
            <rabbit:binding key="dlx_ttl" queue="test_ttl_queue"/>
            <rabbit:binding key="dlx_max" queue="test_max_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
</beans>
  • 编写发送消息类
    public class SenderDLX {
      public static void main(String[] args) {
          //1. 创建spring容器
          ClassPathXmlApplicationContext applicationContext =
                  new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml");
          //2. 从容器中获得Rabbit模板对象
          RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
          //传递路由键,将消息传递到配置的test_ttl_queue队列
          rabbitTemplate.convertAndSend("dlx_ttl", "测试超时".getBytes());
          System.out.println("[消息已发出]");
          applicationContext.close();
      }
    }
    

发送消息:在管理端查看如下,在test_ttl_queue中有一条消息,dlx_queue中的一条消息是我之前测试的。
image.png
等待6S之后: ,在test_ttl_queue中有0条消息,dlx_queue中的2条消息。
image.png

  • 测试消息超出长度,在配置中我们配置的是两条消息,发送三条消息进行测试

           rabbitTemplate.convertAndSend("dlx_max", "测试长度1".getBytes());
          rabbitTemplate.convertAndSend("dlx_max", "测试长度2".getBytes());
          rabbitTemplate.convertAndSend("dlx_max", "测试长度3".getBytes());
    

    查看管理端:可以看到test_max_queue 只有2条,超出的那条被挤到了死信队列dlx_queue
    image.png
    消息顺序发送 1 2 3,第一条消息被挤到了dlx_queue死信队列中了,如下查看死信队列的信息
    image.png

    延迟队列

    延迟队列:TTL + 死信队列的合体 死信队列只是一种特殊的队列,里面的消息仍然可以消费。 在电商开发部分中,都会涉及到延时关闭顶点,此时延迟队列正好可以解决这个问题

  • 生产者:沿用上面死信队列案例的超时测试,超时时间改为订单关闭时间即可。

  • 消费者:监听死信队列

      <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="3">
          <!-- 监听死信队列 -->
          <rabbit:listener ref="consumerListener" queue-names="dlx_queue"/>
      </rabbit:listener-container>
    
    @Component
    public class ConsumerListener extends AbstractAdaptableMessageListener {
      //jackson提供序列化和反序列化中使用最多的类,转换json的
      private static final ObjectMapper MAPPER = new ObjectMapper();
    
      @Override
      public void onMessage(Message message, Channel channel) throws Exception {
          // 将Message对象转换成json
          try {
              byte[] body = message.getBody();
              String s = new String(body);
              System.out.println(s);
    //            JsonNode jsonNode = MAPPER.readTree(message.getBody());
    //            String name = jsonNode.get("name").asText();
    //            String email = jsonNode.get("email").asText();
    //            System.out.println("从队列中获取:" + name + "的邮箱是:" + email);
              //手动确认消息
              //参数1:RabbitMQ向该channel投递的这条消息的唯一标识id,此id是一个单调递增的正整数
              //参数2:为了减少网络流量,手动确认可以被批量处理。当为true时,则可以一次性确认小于等于messageId值的所有消息
              //加入messageId=8 则可以确认<=8的所有消息
              long messageId = message.getMessageProperties().getDeliveryTag();
    //            System.out.println("messageId:" + messageId);
              channel.basicAck(messageId, true);
    //            Thread.sleep(3000);
    //            System.out.println("休息三秒后再继续接收消息");
          } catch (Exception e) {
              e.printStackTrace();
          }
      }
    }
    

    运行发送端:
    image.png
    消费端监听死信队列:拿到消息ACK确认,消息被消费掉了,所以dlx_queue队列中消息消费掉了 就没有了
    image.png
    image.png