目的:解决消息的延迟接收。
使用场景:超过未支付的处理。
解决方案:第三方插件:rabbitmq_delayed_message_exchange,RabbitMQ没有自带这个功能,(阿里的收费RocketMQ可以自定义延迟时间)
下面是测试过程:

环境准备

  • 方案一:

直接拉取带有插件的MQ,参考 github:https://github.com/heidiks/rabbitmq-delayed-message-exchange

  1. docker run -di \
  2. -p 5672:5672 \
  3. -p 15672:15672 \
  4. --name tian_rabbitmq \
  5. heidiks/rabbitmq-delayed-message-exchange:latest

默认用户名和密码:guest@guest

  • 方案二:

手动安装插件

1、下载插件:rabbitmq_delayed_message_exchange,注意版本对应

RabbitMQ 延迟消息 - 图1

2、将其放置到RabbitMQ安装目录下的plugins目录下,并使用如下命令启动这个插件:

  1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3、重启一下RabbitMQ,让其生效。

Springboot集成测试

准备

1、引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2、配置属性

  1. server:
  2. port: 8881
  3. spring:
  4. rabbitmq:
  5. host: localhost
  6. port: 5672
  7. username: guest
  8. password: guest

发送端

image.png

  • 自定义ConnectionFactory和RabbitTemplate ```shell package com.mq.rabbitmq;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

@Configuration @ConfigurationProperties(prefix = “spring.rabbitmq”) public class RabbitMqConfig { private String host; private int port; private String userName; private String password;

  1. @Bean
  2. public ConnectionFactory connectionFactory() {
  3. CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
  4. cachingConnectionFactory.setUsername(userName);
  5. cachingConnectionFactory.setPassword(password);
  6. cachingConnectionFactory.setVirtualHost("/");
  7. cachingConnectionFactory.setPublisherConfirms(true);
  8. return cachingConnectionFactory;
  9. }
  10. @Bean
  11. public RabbitTemplate rabbitTemplate() {
  12. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
  13. return rabbitTemplate;
  14. }
  15. public String getHost() {
  16. return host;
  17. }
  18. public void setHost(String host) {
  19. this.host = host;
  20. }
  21. public int getPort() {
  22. return port;
  23. }
  24. public void setPort(int port) {
  25. this.port = port;
  26. }
  27. public String getUserName() {
  28. return userName;
  29. }
  30. public void setUserName(String userName) {
  31. this.userName = userName;
  32. }
  33. public String getPassword() {
  34. return password;
  35. }
  36. public void setPassword(String password) {
  37. this.password = password;
  38. }

}

  1. - ExchangeQueue配置
  2. ```shell
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. @Configuration
  9. public class QueueConfig {
  10. @Bean
  11. public CustomExchange delayExchange() {
  12. Map<String, Object> args = new HashMap<>();
  13. args.put("x-delayed-type", "direct");
  14. return new CustomExchange("test_exchange", "x-delayed-message",true, false,args);
  15. }
  16. @Bean
  17. public Queue queue() {
  18. Queue queue = new Queue("test_queue_1", true);
  19. return queue;
  20. }
  21. @Bean
  22. public Binding binding() {
  23. return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs();
  24. }
  25. }
  • 消息发送方法 ```shell

import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat; import java.util.Date;

@Service public class MessageServiceImpl {

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. public void sendMsg(String queueName,String msg) {
  4. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  5. System.out.println("消息发送时间:"+sdf.format(new Date()));
  6. rabbitTemplate.convertAndSend("test_exchange", queueName, msg, new MessagePostProcessor() {
  7. @Override
  8. public Message postProcessMessage(Message message) throws AmqpException {
  9. message.getMessageProperties().setHeader("x-delay",3000);
  10. return message;
  11. }
  12. });
  13. }

}

  1. - 调用
  2. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/241764/1615437001326-d4727163-83ba-4332-8bee-5270a1ad07e5.png#align=left&display=inline&height=485&margin=%5Bobject%20Object%5D&name=image.png&originHeight=485&originWidth=891&size=65953&status=done&style=none&width=891)
  3. <a name="5YFYA"></a>
  4. ### 接收端
  5. 正常接收(不需要和发送端一样自定义template
  6. ```shell
  7. package com.mq.rabbitmq;
  8. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  9. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  10. import org.springframework.stereotype.Component;
  11. import java.text.SimpleDateFormat;
  12. import java.util.Date;
  13. @Component
  14. public class MessageReceiver {
  15. @RabbitListener(queues = "test_queue_1")
  16. public void receive(String msg) {
  17. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  18. System.out.println("消息接收时间:"+sdf.format(new Date()));
  19. System.out.println("接收到的消息:"+msg);
  20. }
  21. }

效果:

时间差为3秒,符合预期。
image.png
image.png

参考

https://juejin.cn/post/6844903601286955021