过期时间表示可以为消息设置预期的时间,在这个时间内都可以被消费者接受获取,超过这个时间之后会自定删除

RabbitMQ可以对 消息和队列 设置TTL 有两种方法可以设置

设置TTL

给队列设置过期时间

通过对队列设置属性,队列中的所有消息在时间内都可以被消费,过了时间之后将自动被删除

配置类配置

  1. @Configuration
  2. public class TTLRabbitConfig {
  3. // 声明注册direct模式的交换机
  4. @Bean
  5. public DirectExchange ttlDirectExchange() {
  6. return new DirectExchange("ttl_direct_exchange", true, false);
  7. }
  8. // 设置队列过期时间
  9. @Bean
  10. public Queue ttlQueue() {
  11. //设置队列参数
  12. Map<String, Object> args = new HashMap<>();
  13. //x-message-ttl: 表示过期时间
  14. args.put("x-message-ttl",5000);
  15. return new Queue("ttl_direct_queue",true,false,false,args);
  16. }
  17. // 完成交换机和队列,路由key的绑定
  18. @Bean
  19. public Binding ttlBinding() {
  20. return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
  21. }
  22. }

image.png

测试代码

  1. public boolean ttlDirectOrder(String userId, String productId, Integer num) {
  2. //保存订单业务执行
  3. String orderId = UUID.randomUUID().toString();
  4. log.info("订单业务执行完毕");
  5. log.info("开始异步执行消息分发");
  6. //通过MQ进行消息分发
  7. //定义交换机
  8. String exchangeName = "ttl_direct_exchange";
  9. //消息分发
  10. rabbitTemplate.convertAndSend(exchangeName, "ttl", orderId);
  11. return false;
  12. }

web页面查看

image.png


image.png

给消息设置过期时间

对消息进行单独设置,每条消息设置的TTL可以不同

配置类

  1. @Configuration
  2. public class TTLRabbitConfig {
  3. @Bean
  4. public DirectExchange ttlMessageDirectExchange() {
  5. return new DirectExchange("ttl_message_direct_exchange", true, false);
  6. }
  7. // 设置消息过期时间
  8. @Bean
  9. public Queue ttlMessageQueue() {
  10. return new Queue("ttl_message_direct_queue", true);
  11. }
  12. // 完成交换机和队列,路由key的绑定
  13. @Bean
  14. public Binding ttlMessageBinding() {
  15. return BindingBuilder.bind(ttlMessageQueue()).to(ttlMessageDirectExchange()).with("ttL_message");
  16. }
  17. }

测试代码

  1. public boolean ttlDirectOrderMessage(String userId, String productId, Integer num) {
  2. //保存订单业务执行
  3. String orderId = UUID.randomUUID().toString();
  4. log.info("订单业务执行完毕");
  5. log.info("开始异步执行消息分发");
  6. //通过MQ进行消息分发
  7. //定义交换机
  8. String exchangeName = "ttl_message_direct_exchange";
  9. String routingKey = "ttL_message";
  10. //给消息设置TTL
  11. MessagePostProcessor processor = new MessagePostProcessor() {
  12. @Override
  13. public Message postProcessMessage(Message message) throws AmqpException {
  14. //设置过期时间
  15. message.getMessageProperties().setExpiration("5000");
  16. message.getMessageProperties().setContentEncoding("UTF-8");
  17. return message;
  18. }
  19. };
  20. //消息分发
  21. rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
  22. return false;
  23. }

关键代码

  1. //给消息设置TTL
  2. MessagePostProcessor processor = new MessagePostProcessor() {
  3. @Override
  4. public Message postProcessMessage(Message message) throws AmqpException {
  5. //设置过期时间
  6. message.getMessageProperties().setExpiration("5000");
  7. message.getMessageProperties().setContentEncoding("UTF-8");
  8. return message;
  9. }
  10. };

如果两者都设置了,以最小设置的TTL时间为准
例如:队列设置为5S,消息设置为3S,则过期时间为最小的时间

消息的过期时间过期之后直接删除,队列的过期可以移动到死信队列中