image.png
    image.png
    image.png
    image.png
    image.png
    下面将进行代码示例:

    一、SpringBoot环境配置
    1.xml

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework</groupId>
    4. <artifactId>spring-context</artifactId>
    5. <version>5.1.7.RELEASE</version>
    6. </dependency>
    7. <dependency>
    8. <groupId>org.springframework.amqp</groupId>
    9. <artifactId>spring-rabbit</artifactId>
    10. <version>2.1.8.RELEASE</version>
    11. </dependency>
    12. <dependency>
    13. <groupId>junit</groupId>
    14. <artifactId>junit</artifactId>
    15. <version>4.12</version>
    16. </dependency>
    17. <dependency>
    18. <groupId>org.springframework</groupId>
    19. <artifactId>spring-test</artifactId>
    20. <version>5.1.7.RELEASE</version>
    21. </dependency>
    22. </dependencies>
    23. <!--打包jar-->
    24. <build>
    25. <finalName>service-common-crm</finalName>
    26. <plugins>
    27. <!--spring-boot-maven-plugin-->
    28. <plugin>
    29. <groupId>org.springframework.boot</groupId>
    30. <artifactId>spring-boot-maven-plugin</artifactId>
    31. <!--解决打包出来的jar文件中没有主清单属性问题-->
    32. <executions>
    33. <execution>
    34. <goals>
    35. <goal>repackage</goal>
    36. </goals>
    37. </execution>
    38. </executions>
    39. </plugin>
    40. </plugins>
    41. </build>

    2.rabbitmq.properties配置文件

    1. rabbitmq.host=124.71.7.200
    2. rabbitmq.port=5672
    3. rabbitmq.username=azhi
    4. rabbitmq.password=2021888
    5. rabbitmq.virtual-host=vh_test

    3.Bean容器,生产者spring-rabbitmq-producer.xml

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <beans xmlns="http://www.springframework.org/schema/beans"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xmlns:context="http://www.springframework.org/schema/context"
    5. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    6. xsi:schemaLocation="http://www.springframework.org/schema/beans
    7. http://www.springframework.org/schema/beans/spring-beans.xsd
    8. http://www.springframework.org/schema/context
    9. https://www.springframework.org/schema/context/spring-context.xsd
    10. http://www.springframework.org/schema/rabbit
    11. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    12. <!--加载配置文件-->
    13. <context:property-placeholder location="classpath:rabbitmq.properties"/>
    14. <!-- 定义rabbitmq connectionFactory -->
    15. <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
    16. port="${rabbitmq.port}"
    17. username="${rabbitmq.username}"
    18. password="${rabbitmq.password}"
    19. virtual-host="${rabbitmq.virtual-host}"
    20. publisher-confirms="true"
    21. publisher-returns="true"
    22. />
    23. <!--定义管理交换机、队列-->
    24. <rabbit:admin connection-factory="connectionFactory"/>
    25. <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    26. <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    27. <!--####################################消息可靠性投递 开始 ################################-->
    28. <!--消息可靠性投递(生产端)-->
    29. <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
    30. <rabbit:direct-exchange name="test_exchange_confirm">
    31. <rabbit:bindings>
    32. <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
    33. </rabbit:bindings>
    34. </rabbit:direct-exchange>
    35. <!--######################################消息可靠性投递 结束 ##########################################-->
    36. <!--####################################TTL 开始 ################################-->
    37. <!--ttl-->
    38. <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
    39. <!--设置queue的参数-->
    40. <rabbit:queue-arguments>
    41. <!--x-message-ttl指队列的过期时间-->
    42. <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
    43. </rabbit:queue-arguments>
    44. </rabbit:queue>
    45. <rabbit:topic-exchange name="test_exchange_ttl" >
    46. <rabbit:bindings>
    47. <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
    48. </rabbit:bindings>
    49. </rabbit:topic-exchange>
    50. <!--######################################TTL 结束 ##########################################-->
    51. <!--#######################################死信队列 开始 #########################################-->
    52. <!--
    53. 死信队列:
    54. 1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
    55. 2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
    56. 3. 正常队列绑定死信交换机
    57. 设置两个参数:
    58. * x-dead-letter-exchange:死信交换机名称
    59. * x-dead-letter-routing-key:发送给死信交换机的routingkey
    60. -->
    61. <!--
    62. 1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
    63. -->
    64. <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
    65. <!--3. 正常队列绑定死信交换机-->
    66. <rabbit:queue-arguments>
    67. <!--3.1 x-dead-letter-exchange:死信交换机名称-->
    68. <entry key="x-dead-letter-exchange" value="exchange_dlx" />
    69. <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
    70. <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
    71. <!--4.1 设置队列的过期时间 ttl-->
    72. <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
    73. <!--4.2 设置队列的长度限制 max-length-->
    74. <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
    75. </rabbit:queue-arguments>
    76. </rabbit:queue>
    77. <rabbit:topic-exchange name="test_exchange_dlx">
    78. <rabbit:bindings>
    79. <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
    80. </rabbit:bindings>
    81. </rabbit:topic-exchange>
    82. <!--
    83. 2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
    84. -->
    85. <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    86. <rabbit:topic-exchange name="exchange_dlx">
    87. <rabbit:bindings>
    88. <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
    89. </rabbit:bindings>
    90. </rabbit:topic-exchange>
    91. <!--######################################死信队列 结束##########################################-->
    92. <!--########################################延迟队列 开始########################################-->
    93. <!--
    94. 延迟队列:
    95. 1. 定义正常交换机(order_exchange)和队列(order_queue)
    96. 2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
    97. 3. 绑定,设置正常队列过期时间为30分钟
    98. -->
    99. <!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
    100. <rabbit:queue id="order_queue" name="order_queue">
    101. <!--3. 绑定,设置正常队列过期时间为30分钟-->
    102. <rabbit:queue-arguments>
    103. <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
    104. <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
    105. <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
    106. </rabbit:queue-arguments>
    107. </rabbit:queue>
    108. <rabbit:topic-exchange name="order_exchange">
    109. <rabbit:bindings>
    110. <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
    111. </rabbit:bindings>
    112. </rabbit:topic-exchange>
    113. <!--2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
    114. <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    115. <rabbit:topic-exchange name="order_exchange_dlx">
    116. <rabbit:bindings>
    117. <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
    118. </rabbit:bindings>
    119. </rabbit:topic-exchange>
    120. <!--######################################延迟队列 结束 ##########################################-->
    121. </beans>

    4.Bean窗口,消费者spring-rabbitmq-consumer.xml

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <beans xmlns="http://www.springframework.org/schema/beans"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xmlns:context="http://www.springframework.org/schema/context"
    5. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    6. xsi:schemaLocation="http://www.springframework.org/schema/beans
    7. http://www.springframework.org/schema/beans/spring-beans.xsd
    8. http://www.springframework.org/schema/context
    9. https://www.springframework.org/schema/context/spring-context.xsd
    10. http://www.springframework.org/schema/rabbit
    11. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    12. <!--加载配置文件-->
    13. <context:property-placeholder location="classpath:rabbitmq.properties"/>
    14. <!-- 定义rabbitmq connectionFactory -->
    15. <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
    16. port="${rabbitmq.port}"
    17. username="${rabbitmq.username}"
    18. password="${rabbitmq.password}"
    19. virtual-host="${rabbitmq.virtual-host}"/>
    20. <context:component-scan base-package="com.baiqi.listener" />
    21. <!--定义监听器容器
    22. acknowledge="manual":手动签收
    23. prefetch="1":每次抓取多少条消息
    24. -->
    25. <!--定义监听器容器 acknowledge="manual" prefetch="1" -->
    26. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
    27. <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
    28. <!--<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>-->
    29. <!--定义监听器,监听正常队列-->
    30. <!--<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>-->
    31. <!--延迟队列效果实现: 一定要监听的是 死信队列!!!-->
    32. <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
    33. </rabbit:listener-container>
    34. </beans>

    二、生产者测试代码

    1. @RunWith(SpringJUnit4ClassRunner.class)
    2. @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
    3. public class ProducerTest {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. //测试 Confirm 模式
    7. @Test
    8. public void testConfirm() {
    9. //定义回调
    10. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    11. /**
    12. *
    13. * @param correlationData 相关配置信息
    14. * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
    15. * @param cause 失败原因
    16. */
    17. @Override
    18. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    19. //System.out.println("confirm方法被执行了...."+correlationData.getId());
    20. //ack 为 true表示 消息已经到达交换机
    21. if (ack) {
    22. //接收成功
    23. System.out.println("接收成功消息" + cause);
    24. } else {
    25. //接收失败
    26. System.out.println("接收失败消息" + cause);
    27. //做一些处理,让消息再次发送。
    28. }
    29. }
    30. });
    31. //进行消息发送
    32. for (int i = 0; i < 5; i++) {
    33. rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message Confirm...");
    34. }
    35. //进行睡眠操作
    36. try {
    37. Thread.sleep(1000);
    38. } catch (Exception e) {
    39. e.printStackTrace();
    40. }
    41. }
    42. //测试 return模式
    43. @Test
    44. public void testReturn() {
    45. //设置交换机处理失败消息的模式 为true的时候,消息达到不了 队列时,会将消息重新返回给生产者
    46. rabbitTemplate.setMandatory(true);
    47. //定义回调
    48. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    49. /**
    50. *
    51. * @param message 消息对象
    52. * @param replyCode 错误码
    53. * @param replyText 错误信息
    54. * @param exchange 交换机
    55. * @param routingKey 路由键
    56. */
    57. @Override
    58. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    59. System.out.println("return 执行了....");
    60. System.out.println("message:" + message);
    61. System.out.println("replyCode:" + replyCode);
    62. System.out.println("replyText:" + replyText);
    63. System.out.println("exchange:" + exchange);
    64. System.out.println("routingKey:" + routingKey);
    65. //处理
    66. }
    67. });
    68. //进行消息发送
    69. rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm11", "message return...");
    70. //进行睡眠操作
    71. try {
    72. Thread.sleep(5000);
    73. } catch (Exception e) {
    74. e.printStackTrace();
    75. }
    76. }
    77. //批量发送消息,让消费者每次拉去指定的数量
    78. @Test
    79. public void testQos() {
    80. for (int i = 0; i < 10; i++) {
    81. // 发送消息
    82. rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
    83. }
    84. }
    85. /**
    86. * TTL:过期时间
    87. * 1. 队列统一过期
    88. * <p>
    89. * 2. 消息单独过期
    90. * <p>
    91. * <p>
    92. * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
    93. * 队列过期后,会将队列所有消息全部移除。
    94. * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
    95. */
    96. @Test
    97. public void testTtl() {
    98. for (int i = 0; i < 10; i++) {
    99. // 发送消息
    100. rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.baiqi", "message ttl....");
    101. }
    102. }
    103. /**
    104. * 发送测试死信消息:
    105. * 1. 过期时间
    106. * 2. 长度限制
    107. * 3. 消息拒收
    108. */
    109. @Test
    110. public void testDlx() {
    111. //1. 测试过期时间,死信消息
    112. rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hehe", "死信消息测试1,开始...");
    113. //2. 测试长度限制后,消息死信
    114. /* for (int i = 0; i < 20; i++) {
    115. rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe","死信消息测试2,开始...");
    116. }*/
    117. //3. 测试消息拒收
    118. //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.baiqi","死信消息测试3,开始...");
    119. }
    120. /*
    121. * 测试延时消息
    122. * */
    123. @Test
    124. public void testDelay() throws InterruptedException {
    125. //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
    126. rabbitTemplate.convertAndSend("order_exchange", "order.msg", "订单信息:id=1,time=2020年12月...");
    127. //2.打印倒计时10秒
    128. for (int i = 10; i > 0; i--) {
    129. System.out.println(i + "...");
    130. Thread.sleep(1000);
    131. }
    132. }
    133. }

    执行testConfirm结果
    image.png
    执行testReturn结果
    image.png
    三、消费者测试代码
    1.初始化IOC容器

    1. import org.springframework.context.ApplicationContext;
    2. import org.springframework.context.support.ClassPathXmlApplicationContext;
    3. public class Test {
    4. public static void main(String[] args) {
    5. //初始化IOC容器
    6. ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring-rabbitmq-consumer.xml");
    7. }
    8. }

    2.AckListener类

    1. import com.rabbitmq.client.Channel;
    2. import org.springframework.amqp.core.Message;
    3. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    4. import org.springframework.stereotype.Component;
    5. /**
    6. */
    7. @Component
    8. public class AckListener implements ChannelAwareMessageListener {
    9. @Override
    10. public void onMessage(Message message, Channel channel) throws Exception {
    11. //1、获取消息的id
    12. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    13. try {
    14. //2、获取消息
    15. System.out.println("message:"+new String(message.getBody()));
    16. //3、进行业务处理
    17. System.out.println("=====进行业务处理====");
    18. //模拟出现异常
    19. //int i = 5/0;
    20. //4、进行消息签收
    21. //channel.basicAck(deliveryTag, false);
    22. System.out.println("收到了消息:"+deliveryTag);
    23. } catch (Exception e) {
    24. //拒绝签收
    25. /*
    26. 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
    27. */
    28. // channel.basicNack(deliveryTag, false, true);
    29. }
    30. }
    31. }

    3.DlxListener类

    1. import com.rabbitmq.client.Channel;
    2. import org.springframework.amqp.core.Message;
    3. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    4. import org.springframework.stereotype.Component;
    5. @Component
    6. public class DlxListener implements ChannelAwareMessageListener {
    7. @Override
    8. public void onMessage(Message message, Channel channel) throws Exception {
    9. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    10. try {
    11. //1.接收转换消息
    12. System.out.println(new String(message.getBody()));
    13. //2. 处理业务逻辑
    14. System.out.println("处理业务逻辑...");
    15. //int i = 3/0;//出现错误
    16. //3. 手动签收
    17. channel.basicAck(deliveryTag,true);
    18. } catch (Exception e) {
    19. //e.printStackTrace();
    20. System.out.println("出现异常,拒绝接受");
    21. //4.拒绝签收,不重回队列 requeue=false
    22. channel.basicNack(deliveryTag,true,false);
    23. }
    24. }
    25. }

    4.OrderListener类

    1. import com.rabbitmq.client.Channel;
    2. import org.springframework.amqp.core.Message;
    3. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    4. import org.springframework.stereotype.Component;
    5. @Component
    6. public class OrderListener implements ChannelAwareMessageListener {
    7. @Override
    8. public void onMessage(Message message, Channel channel) throws Exception {
    9. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    10. try {
    11. //1.接收转换消息
    12. System.out.println(new String(message.getBody()));
    13. //2. 处理业务逻辑
    14. System.out.println("处理业务逻辑...");
    15. System.out.println("根据订单id查询其状态...");
    16. System.out.println("判断状态是否为支付成功");
    17. System.out.println("取消订单,回滚库存....");
    18. //3. 手动签收
    19. channel.basicAck(deliveryTag,true);
    20. } catch (Exception e) {
    21. //e.printStackTrace();
    22. System.out.println("出现异常,拒绝接受");
    23. //4.拒绝签收,不重回队列 requeue=false
    24. channel.basicNack(deliveryTag,true,false);
    25. }
    26. }
    27. }

    5.QosListener类

    1. import com.rabbitmq.client.Channel;
    2. import org.springframework.amqp.core.Message;
    3. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    4. import org.springframework.stereotype.Component;
    5. /**
    6. */
    7. @Component
    8. public class QosListener implements ChannelAwareMessageListener {
    9. @Override
    10. public void onMessage(Message message, Channel channel) throws Exception {
    11. //获取到的消息
    12. System.out.println(new String(message.getBody()));
    13. Thread.sleep(1000);
    14. //处理业务逻辑
    15. //进行消息的签收
    16. //channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    17. }
    18. }