想要Rabbitmq实现延迟队列,
需要使用Rabbitmq的死信交换机(Exchange)
消息的存活时间TTL(Time To Live)
**

死信交换机

死信交换机就是一个存放过期消息的普通交换机
一个消息在满足以下条件时会进入死信交换机

  • 被消费者拒收,并且reject方法的参数中的requeue的值是false
  • 消息设置的TTL到了,消息过期
  • 队列的长度限制已经满了,排在前面的消息会被丢弃或者进入死信路由上

参考:https://www.yuque.com/docs/share/d7ceb64a-6528-4a87-a159-4377dabad1de?# 《二、死信队列》

消息TTL

RabbitMq可以对队列和消息本身设置TTL,如果两者都设置了TTL,则会选取范围小的一个作为过期时间

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:
参考:https://www.yuque.com/docs/share/45574572-b247-4acc-bbb8-295b611a9452?# 《一、过期时间TTL》

实现定时任务

流程图

image.png

web页面创建交换机和队列

交换机

image.png

队列

image.png
参数详解:

  • x-message-ttl:过期时间
  • x-dead-letter-exchange:代表消息过期后,消息要进入的交换机
  • x-dead-letter-routing-key是配置消息过期后,进入死信交换机的routing-key,根据这个key将消息放入不同的队列

死信队列

image.png
所有进入这个队列的消息都会被处理

交换机绑定队列

image.png
delayqueue2的key要设置为创建正常队列的x-dead-letter-routing-key参数,这样当消息过期的时候就可以自动把消息放入delay_queue2这个队列中了

代码实现

发送消息

  1. String msg = "hello word";
  2. MessageProperties messageProperties = new MessageProperties();
  3. messageProperties.setExpiration("6000");
  4. messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
  5. Message message = new Message(msg.getBytes(), messageProperties);
  6. rabbitTemplate.convertAndSend("delay", "delay",message);

接收消息

  1. @Configuration
  2. public class DelayQueue {
  3. /** 消息交换机的名字*/
  4. public static final String EXCHANGE = "delay";
  5. /** 队列key1*/
  6. public static final String ROUTINGKEY1 = "delay";
  7. /** 队列key2*/
  8. public static final String ROUTINGKEY2 = "delay_key";
  9. /**
  10. * 配置链接信息
  11. * @return
  12. */
  13. @Bean
  14. public ConnectionFactory connectionFactory() {
  15. CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);
  16. connectionFactory.setUsername("kberp");
  17. connectionFactory.setPassword("kberp");
  18. connectionFactory.setVirtualHost("/");
  19. connectionFactory.setPublisherConfirms(true); // 必须要设置
  20. return connectionFactory;
  21. }
  22. /**
  23. * 配置消息交换机
  24. * 针对消费者配置
  25. FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
  26. HeadersExchange :通过添加属性key-value匹配
  27. DirectExchange:按照routingkey分发到指定队列
  28. TopicExchange:多关键字匹配
  29. */
  30. @Bean
  31. public DirectExchange defaultExchange() {
  32. return new DirectExchange(EXCHANGE, true, false);
  33. }
  34. /**
  35. * 配置消息队列2
  36. * 针对消费者配置
  37. * @return
  38. */
  39. @Bean
  40. public Queue queue() {
  41. return new Queue("delay_queue2", true); //队列持久
  42. }
  43. /**
  44. * 将消息队列2与交换机绑定
  45. * 针对消费者配置
  46. * @return
  47. */
  48. @Bean
  49. @Autowired
  50. public Binding binding() {
  51. return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
  52. }
  53. /**
  54. * 接受消息的监听,这个监听会接受消息队列1的消息
  55. * 针对消费者配置
  56. * @return
  57. */
  58. @Bean
  59. @Autowired
  60. public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
  61. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
  62. container.setQueues(queue());
  63. container.setExposeListenerChannel(true);
  64. container.setMaxConcurrentConsumers(1);
  65. container.setConcurrentConsumers(1);
  66. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
  67. container.setMessageListener(new ChannelAwareMessageListener() {
  68. public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
  69. byte[] body = message.getBody();
  70. System.out.println("delay_queue2 收到消息 : " + new String(body));
  71. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
  72. }
  73. });
  74. return container;
  75. }
  76. }

在消息监听中处理需要定时处理的任务就好了,因为Rabbitmq能发送消息,所以可以把任务特征码发过来,比如关闭订单就把订单id发过来,这样就避免了需要查询一下那些订单需要关闭而加重MySQL负担了,毕竟一旦订单量大的话,查询本身也是一件很费IO的事情

总结

基于Rabbitmq实现定时任务,就是将消息设置一个过期时间,放入一个没有读取的队列中,让消息过期后自动转入另外一个队列中,监控这个队列消息的监听处来处理定时任务具体的操作。