想要Rabbitmq实现延迟队列,
需要使用Rabbitmq的死信交换机(Exchange)
和消息的存活时间TTL(Time To Live)
**
死信交换机
死信交换机就是一个存放过期消息的普通交换机
一个消息在满足以下条件时会进入死信交换机
- 被消费者拒收,并且reject方法的参数中的requeue的值是false
- 消息设置的TTL到了,消息过期
- 队列的长度限制已经满了,排在前面的消息会被丢弃或者进入死信路由上
参考:https://www.yuque.com/docs/share/d7ceb64a-6528-4a87-a159-4377dabad1de?# 《二、死信队列》
消息TTL
RabbitMq可以对队列和消息本身设置TTL,如果两者都设置了TTL,则会选取范围小的一个作为过期时间
可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:
参考:https://www.yuque.com/docs/share/45574572-b247-4acc-bbb8-295b611a9452?# 《一、过期时间TTL》
实现定时任务
流程图

web页面创建交换机和队列
交换机
队列

参数详解:
x-message-ttl:过期时间x-dead-letter-exchange:代表消息过期后,消息要进入的交换机x-dead-letter-routing-key是配置消息过期后,进入死信交换机的routing-key,根据这个key将消息放入不同的队列
死信队列

所有进入这个队列的消息都会被处理
交换机绑定队列

delayqueue2的key要设置为创建正常队列的x-dead-letter-routing-key参数,这样当消息过期的时候就可以自动把消息放入delay_queue2这个队列中了
代码实现
发送消息
String msg = "hello word";MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("6000");messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());Message message = new Message(msg.getBytes(), messageProperties);rabbitTemplate.convertAndSend("delay", "delay",message);
接收消息
@Configurationpublic class DelayQueue {/** 消息交换机的名字*/public static final String EXCHANGE = "delay";/** 队列key1*/public static final String ROUTINGKEY1 = "delay";/** 队列key2*/public static final String ROUTINGKEY2 = "delay_key";/*** 配置链接信息* @return*/@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);connectionFactory.setUsername("kberp");connectionFactory.setPassword("kberp");connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true); // 必须要设置return connectionFactory;}/*** 配置消息交换机* 针对消费者配置FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念HeadersExchange :通过添加属性key-value匹配DirectExchange:按照routingkey分发到指定队列TopicExchange:多关键字匹配*/@Beanpublic DirectExchange defaultExchange() {return new DirectExchange(EXCHANGE, true, false);}/*** 配置消息队列2* 针对消费者配置* @return*/@Beanpublic Queue queue() {return new Queue("delay_queue2", true); //队列持久}/*** 将消息队列2与交换机绑定* 针对消费者配置* @return*/@Bean@Autowiredpublic Binding binding() {return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);}/*** 接受消息的监听,这个监听会接受消息队列1的消息* 针对消费者配置* @return*/@Bean@Autowiredpublic SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());container.setQueues(queue());container.setExposeListenerChannel(true);container.setMaxConcurrentConsumers(1);container.setConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认container.setMessageListener(new ChannelAwareMessageListener() {public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {byte[] body = message.getBody();System.out.println("delay_queue2 收到消息 : " + new String(body));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费}});return container;}}
在消息监听中处理需要定时处理的任务就好了,因为Rabbitmq能发送消息,所以可以把任务特征码发过来,比如关闭订单就把订单id发过来,这样就避免了需要查询一下那些订单需要关闭而加重MySQL负担了,毕竟一旦订单量大的话,查询本身也是一件很费IO的事情
总结
基于Rabbitmq实现定时任务,就是将消息设置一个过期时间,放入一个没有读取的队列中,让消息过期后自动转入另外一个队列中,监控这个队列消息的监听处来处理定时任务具体的操作。
