想要Rabbitmq实现延迟队列,
需要使用Rabbitmq的死信交换机(Exchange)
和消息的存活时间TTL(Time To Live)
**
死信交换机
死信交换机就是一个存放过期消息的普通交换机
一个消息在满足以下条件时会进入死信交换机
- 被消费者拒收,并且reject方法的参数中的requeue的值是false
- 消息设置的TTL到了,消息过期
- 队列的长度限制已经满了,排在前面的消息会被丢弃或者进入死信路由上
参考:https://www.yuque.com/docs/share/d7ceb64a-6528-4a87-a159-4377dabad1de?# 《二、死信队列》
消息TTL
RabbitMq可以对队列和消息本身设置TTL,如果两者都设置了TTL,则会选取范围小的一个作为过期时间
可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:
参考:https://www.yuque.com/docs/share/45574572-b247-4acc-bbb8-295b611a9452?# 《一、过期时间TTL》
实现定时任务
流程图
web页面创建交换机和队列
交换机
队列
参数详解:
x-message-ttl:
过期时间x-dead-letter-exchange:
代表消息过期后,消息要进入的交换机x-dead-letter-routing-key
是配置消息过期后,进入死信交换机的routing-key
,根据这个key将消息放入不同的队列
死信队列
所有进入这个队列的消息都会被处理
交换机绑定队列
delayqueue2
的key要设置为创建正常队列的x-dead-letter-routing-key
参数,这样当消息过期的时候就可以自动把消息放入delay_queue2这个队列中了
代码实现
发送消息
String msg = "hello word";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("6000");
messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("delay", "delay",message);
接收消息
@Configuration
public class DelayQueue {
/** 消息交换机的名字*/
public static final String EXCHANGE = "delay";
/** 队列key1*/
public static final String ROUTINGKEY1 = "delay";
/** 队列key2*/
public static final String ROUTINGKEY2 = "delay_key";
/**
* 配置链接信息
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);
connectionFactory.setUsername("kberp");
connectionFactory.setPassword("kberp");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必须要设置
return connectionFactory;
}
/**
* 配置消息交换机
* 针对消费者配置
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE, true, false);
}
/**
* 配置消息队列2
* 针对消费者配置
* @return
*/
@Bean
public Queue queue() {
return new Queue("delay_queue2", true); //队列持久
}
/**
* 将消息队列2与交换机绑定
* 针对消费者配置
* @return
*/
@Bean
@Autowired
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
}
/**
* 接受消息的监听,这个监听会接受消息队列1的消息
* 针对消费者配置
* @return
*/
@Bean
@Autowired
public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("delay_queue2 收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
}
});
return container;
}
}
在消息监听中处理需要定时处理的任务就好了,因为Rabbitmq能发送消息,所以可以把任务特征码发过来,比如关闭订单就把订单id发过来,这样就避免了需要查询一下那些订单需要关闭而加重MySQL负担了,毕竟一旦订单量大的话,查询本身也是一件很费IO的事情
总结
基于Rabbitmq实现定时任务,就是将消息设置一个过期时间,放入一个没有读取的队列中,让消息过期后自动转入另外一个队列中,监控这个队列消息的监听处来处理定时任务具体的操作。