image.png
    image.png
    image.png
    image.png
    image.png
    下面将进行代码示例:

    一、SpringBoot环境配置
    1.xml

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework</groupId>
    4. <artifactId>spring-context</artifactId>
    5. <version>5.1.7.RELEASE</version>
    6. </dependency>
    7. <dependency>
    8. <groupId>org.springframework.amqp</groupId>
    9. <artifactId>spring-rabbit</artifactId>
    10. <version>2.1.8.RELEASE</version>
    11. </dependency>
    12. <dependency>
    13. <groupId>junit</groupId>
    14. <artifactId>junit</artifactId>
    15. <version>4.12</version>
    16. </dependency>
    17. <dependency>
    18. <groupId>org.springframework</groupId>
    19. <artifactId>spring-test</artifactId>
    20. <version>5.1.7.RELEASE</version>
    21. </dependency>
    22. </dependencies>
    23. <!--打包jar-->
    24. <build>
    25. <finalName>service-common-crm</finalName>
    26. <plugins>
    27. <!--spring-boot-maven-plugin-->
    28. <plugin>
    29. <groupId>org.springframework.boot</groupId>
    30. <artifactId>spring-boot-maven-plugin</artifactId>
    31. <!--解决打包出来的jar文件中没有主清单属性问题-->
    32. <executions>
    33. <execution>
    34. <goals>
    35. <goal>repackage</goal>
    36. </goals>
    37. </execution>
    38. </executions>
    39. </plugin>
    40. </plugins>
    41. </build>

    2.rabbitmq.properties配置文件

    rabbitmq.host=124.71.7.200
    rabbitmq.port=5672
    rabbitmq.username=azhi
    rabbitmq.password=2021888
    rabbitmq.virtual-host=vh_test
    

    3.Bean容器,生产者spring-rabbitmq-producer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           https://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    
        <!--加载配置文件-->
        <context:property-placeholder location="classpath:rabbitmq.properties"/>
    
        <!-- 定义rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   publisher-confirms="true"
                                   publisher-returns="true"
        />
    
        <!--定义管理交换机、队列-->
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
        <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    
    
    
        <!--####################################消息可靠性投递 开始 ################################-->
        <!--消息可靠性投递(生产端)-->
        <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
    
        <rabbit:direct-exchange name="test_exchange_confirm">
            <rabbit:bindings>
                <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
    
        <!--######################################消息可靠性投递 结束 ##########################################-->
    
        <!--####################################TTL 开始 ################################-->
        <!--ttl-->
         <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
             <!--设置queue的参数-->
             <rabbit:queue-arguments>
                 <!--x-message-ttl指队列的过期时间-->
                 <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
             </rabbit:queue-arguments>
    
         </rabbit:queue>
    
        <rabbit:topic-exchange name="test_exchange_ttl" >
             <rabbit:bindings>
                 <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
             </rabbit:bindings>
         </rabbit:topic-exchange>
    
        <!--######################################TTL 结束 ##########################################-->
    
    
    
    
    
        <!--#######################################死信队列 开始 #########################################-->
        <!--
            死信队列:
                1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
                2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
                3. 正常队列绑定死信交换机
                    设置两个参数:
                        * x-dead-letter-exchange:死信交换机名称
                        * x-dead-letter-routing-key:发送给死信交换机的routingkey
        -->
    
        <!--
            1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
        -->
    
        <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
            <!--3. 正常队列绑定死信交换机-->
            <rabbit:queue-arguments>
                <!--3.1 x-dead-letter-exchange:死信交换机名称-->
                <entry key="x-dead-letter-exchange" value="exchange_dlx" />
    
                <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
                <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
    
                <!--4.1 设置队列的过期时间 ttl-->
                <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
                <!--4.2 设置队列的长度限制 max-length-->
                <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
            </rabbit:queue-arguments>
        </rabbit:queue>
    
        <rabbit:topic-exchange name="test_exchange_dlx">
            <rabbit:bindings>
                <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:topic-exchange>
    
    
        <!--
           2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
       -->
    
        <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
        <rabbit:topic-exchange name="exchange_dlx">
            <rabbit:bindings>
                <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:topic-exchange>
        <!--######################################死信队列 结束##########################################-->
    
    
    
    
    
        <!--########################################延迟队列 开始########################################-->
    
        <!--
            延迟队列:
                1. 定义正常交换机(order_exchange)和队列(order_queue)
                2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
                3. 绑定,设置正常队列过期时间为30分钟
        -->
        <!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
        <rabbit:queue id="order_queue" name="order_queue">
            <!--3. 绑定,设置正常队列过期时间为30分钟-->
            <rabbit:queue-arguments>
                <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
                <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
                <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
            </rabbit:queue-arguments>
        </rabbit:queue>
    
        <rabbit:topic-exchange name="order_exchange">
            <rabbit:bindings>
                <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:topic-exchange>
    
        <!--2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
        <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    
        <rabbit:topic-exchange name="order_exchange_dlx">
            <rabbit:bindings>
                <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:topic-exchange>
        <!--######################################延迟队列 结束 ##########################################-->
    
    </beans>
    

    4.Bean窗口,消费者spring-rabbitmq-consumer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           https://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        <!--加载配置文件-->
        <context:property-placeholder location="classpath:rabbitmq.properties"/>
    
        <!-- 定义rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"/>
    
    
        <context:component-scan base-package="com.baiqi.listener" />
    
        <!--定义监听器容器
          acknowledge="manual":手动签收
          prefetch="1":每次抓取多少条消息
        -->
    
        <!--定义监听器容器  acknowledge="manual"  prefetch="1"  -->
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
            <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
            <!--<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>-->
    
            <!--定义监听器,监听正常队列-->
            <!--<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>-->
    
            <!--延迟队列效果实现:  一定要监听的是 死信队列!!!-->
    
            <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
        </rabbit:listener-container>
    </beans>
    

    二、生产者测试代码

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
    public class ProducerTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        //测试   Confirm 模式
        @Test
        public void testConfirm() {
            //定义回调
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                /**
                 *
                 * @param correlationData 相关配置信息
                 * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
                 * @param cause 失败原因
                 */
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    //System.out.println("confirm方法被执行了...."+correlationData.getId());
                    //ack 为  true表示 消息已经到达交换机
                    if (ack) {
                        //接收成功
                        System.out.println("接收成功消息" + cause);
                    } else {
                        //接收失败
                        System.out.println("接收失败消息" + cause);
                        //做一些处理,让消息再次发送。
                    }
                }
            });
    
            //进行消息发送
            for (int i = 0; i < 5; i++) {
                rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message Confirm...");
            }
    
            //进行睡眠操作
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        //测试 return模式
        @Test
        public void testReturn() {
            //设置交换机处理失败消息的模式   为true的时候,消息达到不了 队列时,会将消息重新返回给生产者
            rabbitTemplate.setMandatory(true);
            //定义回调
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                /**
                 *
                 * @param message   消息对象
                 * @param replyCode 错误码
                 * @param replyText 错误信息
                 * @param exchange  交换机
                 * @param routingKey 路由键
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("return 执行了....");
                    System.out.println("message:" + message);
                    System.out.println("replyCode:" + replyCode);
                    System.out.println("replyText:" + replyText);
                    System.out.println("exchange:" + exchange);
                    System.out.println("routingKey:" + routingKey);
                    //处理
                }
            });
            //进行消息发送
            rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm11", "message return...");
    
            //进行睡眠操作
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        //批量发送消息,让消费者每次拉去指定的数量
        @Test
        public void testQos() {
            for (int i = 0; i < 10; i++) {
                // 发送消息
                rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
            }
        }
    
        /**
         * TTL:过期时间
         * 1. 队列统一过期
         * <p>
         * 2. 消息单独过期
         * <p>
         * <p>
         * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
         * 队列过期后,会将队列所有消息全部移除。
         * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
         */
        @Test
        public void testTtl() {
            for (int i = 0; i < 10; i++) {
                // 发送消息
                rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.baiqi", "message ttl....");
            }
        }
    
        /**
         * 发送测试死信消息:
         * 1. 过期时间
         * 2. 长度限制
         * 3. 消息拒收
         */
        @Test
        public void testDlx() {
            //1. 测试过期时间,死信消息
            rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hehe", "死信消息测试1,开始...");
    
            //2. 测试长度限制后,消息死信
           /* for (int i = 0; i < 20; i++) {
                rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe","死信消息测试2,开始...");
            }*/
    
            //3. 测试消息拒收
            //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.baiqi","死信消息测试3,开始...");
        }
    
        /*
         * 测试延时消息
         * */
        @Test
        public void testDelay() throws InterruptedException {
            //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
            rabbitTemplate.convertAndSend("order_exchange", "order.msg", "订单信息:id=1,time=2020年12月...");
            //2.打印倒计时10秒
            for (int i = 10; i > 0; i--) {
                System.out.println(i + "...");
                Thread.sleep(1000);
            }
        }
    }
    

    执行testConfirm结果
    image.png
    执行testReturn结果
    image.png
    三、消费者测试代码
    1.初始化IOC容器

    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class Test {
        public static void main(String[] args) {
            //初始化IOC容器
            ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring-rabbitmq-consumer.xml");
        }
    }
    

    2.AckListener类

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.stereotype.Component;
    
    /**
     */
    @Component
    public class AckListener implements ChannelAwareMessageListener {
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            //1、获取消息的id
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            try {
    
            //2、获取消息
            System.out.println("message:"+new String(message.getBody()));
    
            //3、进行业务处理
            System.out.println("=====进行业务处理====");
    
            //模拟出现异常
            //int  i = 5/0;
    
            //4、进行消息签收
                //channel.basicAck(deliveryTag, false);
                System.out.println("收到了消息:"+deliveryTag);
            } catch (Exception e) {
    
                //拒绝签收
                 /*
                第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
                 */
               // channel.basicNack(deliveryTag, false, true);
    
            }
        }
    }
    

    3.DlxListener类

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class DlxListener implements ChannelAwareMessageListener {
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            try {
                //1.接收转换消息
                System.out.println(new String(message.getBody()));
    
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                //int i = 3/0;//出现错误
                //3. 手动签收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("出现异常,拒绝接受");
                //4.拒绝签收,不重回队列 requeue=false
                channel.basicNack(deliveryTag,true,false);
            }
        }
    }
    

    4.OrderListener类

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class OrderListener implements ChannelAwareMessageListener {
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            try {
                //1.接收转换消息
                System.out.println(new String(message.getBody()));
    
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                System.out.println("根据订单id查询其状态...");
                System.out.println("判断状态是否为支付成功");
                System.out.println("取消订单,回滚库存....");
                //3. 手动签收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("出现异常,拒绝接受");
                //4.拒绝签收,不重回队列 requeue=false
                channel.basicNack(deliveryTag,true,false);
            }
        }
    }
    

    5.QosListener类

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.stereotype.Component;
    
    /**
     */
    @Component
    public class QosListener implements ChannelAwareMessageListener {
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            //获取到的消息
            System.out.println(new String(message.getBody()));
    
            Thread.sleep(1000);
    
            //处理业务逻辑
    
            //进行消息的签收
            //channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        }
    }