死信队列

介绍

死信,顾名思义就是无法被消费的消息。一般情况下,生产者生产的消息将直接发送给队列,消费者从队列中获取消息,但是如果出现某些原因可能会导致信息无法被消费,这样的消息如果没有后续的处理,就会变成死信。所以我们通过死信队列,来解决出现死信的问题。

应用场景:

为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。

还有比如说: 用户在商城下单成功并点击去支付后在指定时 间未支付时自动失效

死信的来源

  • 消息 TTL 过期。
  • 队列达到最大长度(队列满了,无法再添加数据到 MQ 中)。
  • 消息被拒绝( basicReject()basicNack() ,且方法的 requeue 参数为 false )。

以上三种情况消息都会成为死信,进入死信队列。

路由流程

进阶功能 - 图1

死信Exchange路由死信消息的流程如下

  1. 生产者将消息发送到 Exchange。
  2. Exchange 将消息路由到 Queue。
  3. 消费者从 Queue 拉取消息。
  4. 消息出现上小节的三种情况,消息成为死信。
  5. Queue 根据 x-dead-letter-exchange 将死信消息发送到死信 Exchange,并根据 x-dead-letter-routing-key 为死信消息设置死信Routing Key。
  6. 死信 Exchange 将死信消息路由到死信 Queue 。

所以在声明队列的时候,我们需要声明 x-dead-letter-exchangex-dead-letter-routing-key 参数来定义死信消息的去向。

死信实战

代码架构图

进阶功能 - 图2

生产者

在生产者的代码中,我们不需要做过多的事,只需要发送消息即可。

  1. public class Producer {
  2. public final static String NORMAL_EXCHANGE = "normal_exchange";
  3. public static void main(String[] args) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. if (channel == null) {
  6. System.out.println("无法获取信道");
  7. return;
  8. }
  9. AMQP.BasicProperties properties = new AMQP.BasicProperties()
  10. .builder()
  11. .expiration(TimeUnit.SECONDS.toMillis(10) + "")
  12. .build();
  13. // 循环发布消息
  14. for (int i = 0; i < 10; i++) {
  15. String msg = "hello " + i;
  16. channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, msg.getBytes(StandardCharsets.UTF_8));
  17. System.out.println("Producer 发布消息:" + msg);
  18. }
  19. RabbitMqUtils.closeAll(channel);
  20. }
  21. }

正常消费者

  1. public class Consumer01 {
  2. public final static String NORMAL_EXCHANGE = "normal_exchange";
  3. public final static String DEAD_EXCHANGE = "dead_exchange";
  4. public final static String NORMAL_QUEUE = "normal_queue";
  5. public final static String DEAD_QUEUE = "dead_queue";
  6. public static void main(String[] args) throws Exception {
  7. Channel channel = RabbitMqUtils.getChannel();
  8. if (channel == null) {
  9. System.out.println("无法获取信道");
  10. return;
  11. }
  12. // 声明正常交换机和死信交换机
  13. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  14. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  15. // arguments 为正常队列需要的参数
  16. Map<String, Object> arguments = new HashMap<>();
  17. // 设置过期时间,但是我们一般不在声明交换机的时候设置,而是在发布消息的时候设置过期时间
  18. // arguments.put("x-message-ttl", TimeUnit.SECONDS.toMillis(10));
  19. // 设置死信交换机
  20. arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  21. // 设置死信routingKey
  22. arguments.put("x-dead-letter-routing-key", "lisi");
  23. // 设置正常队列的长度
  24. // arguments.put("x-max-length", 6);
  25. // 声明正常队列和死信队列
  26. channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
  27. channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
  28. // 绑定正常交换机与队列
  29. channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
  30. channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
  31. System.out.println("Consumer01 等待接收消息...");
  32. // 接收消息回调
  33. DeliverCallback deliverCallback = (consumerTag, message) -> {
  34. String routingKey = message.getEnvelope().getRoutingKey();
  35. String msg = new String(message.getBody());
  36. if ("hello 2".equals(msg)) {
  37. System.out.println("来自 routingKey[" + routingKey + "] 的消息[已拒绝] -> " + msg);
  38. // 第二个参数需要为false,表示不要重新入队,才能进入到死信队列
  39. channel.basicReject(message.getEnvelope().getDeliveryTag(), true);
  40. } else {
  41. System.out.println("来自 routingKey[" + routingKey + "] 的消息 -> " + msg);
  42. // 手动应答
  43. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  44. }
  45. };
  46. // 接收消息
  47. channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag, message) -> {
  48. });
  49. }
  50. }

死信消费者

  1. public class Consumer02 {
  2. public final static String DEAD_QUEUE = "dead_queue";
  3. public static void main(String[] args) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. if (channel == null) {
  6. System.out.println("无法获取信道");
  7. return;
  8. }
  9. System.out.println("Consumer02 等待接收消息...");
  10. // 接收消息回调
  11. DeliverCallback deliverCallback = (consumerTag, message) -> {
  12. String routingKey = message.getEnvelope().getRoutingKey();
  13. String msg = new String(message.getBody());
  14. System.out.println("来自 routingKey[" + routingKey + "] 的消息 -> " + msg);
  15. // 手动应答
  16. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  17. };
  18. // 接收消息
  19. channel.basicConsume(DEAD_QUEUE, false, deliverCallback, (consumerTag, message) -> {
  20. });
  21. }
  22. }

测试

上面的代码,我们需要最先启动正常的消费者,正常消费者会创建两个交换机和两个队列,然后我们立即关闭正常消费者,模拟正常消费者突然宕机的情况,然后我们就可以开启生产者发布消息了:

进阶功能 - 图3

发布消息后,我们开启死信消费者,等待10s后,消息过期就会成为死信,死信消费者应该可以全部获取到,这就是消息 TTL 过期的死信来源。

进阶功能 - 图4

我们还可以取消正常消费者代码第 26 行的注释,让正常队列的长度为6,那么在生产者一次性发布10条消息,同时正常消费者宕机的情况下,死信队列就会收到多余的 4 条消息,这就是达到队列最大长度后的死信来源

进阶功能 - 图5

最后,我们依次开启正常消费者 -> 死信消费者 -> 生产者,在上面的代码中,我已经配置了当消息为 hello 2 的时候,消息会被正常消费者拒绝,并且不会 requeue ,在这种情况下,消息也会成为死信,这就是消息被拒绝的死信来源

正常消费者:

进阶功能 - 图6

死信消费者:

进阶功能 - 图7

延迟队列

介绍

延时队列内部是有序的,最重要的特性就是体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理。简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

使用场景

  • 订单在十分钟之内未支付则自动取消。

  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

  • 用户注册成功后,如果三天内没有登陆则进行短信提醒。

  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员。

  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务。

看起来似乎我们不使用延时队列好像也可以完成上面的功能,我们只需要使用定时任务,一直轮询查询数据,每秒查一次,取出需要处理的数据进行处理就好了。其实对于数据量较小的情况,确实可以这样做。

但是对于数据量大,并且时效性要求比较高的场景,这样做就不太合适了,短期内未支付的订单数据可能会有很多,活动期间剩余成百上万的数据两,不可能在一秒内完成所有订单的检查,同时还会给数据库带来压力,无法满足业务要求而且性能低下,所以使用延时队列还是非常有必要的。

进阶功能 - 图8

消息延迟

TTL 全称为 Time To Live,意味生存时间,在 RabbitMq 中消息的 TTL,也就是消息的最大存活时间,单位是毫秒。

我们可以给队列或者单独给某条消息设置 TTL:

  • 如果给队列设置 TTL,那么整个队列中的所有消息都具有相同的存活时间。
  • 如果给某条消息设置 TTL,那么只设置了该消息的存活时间。

如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用。当消息的存活时间达到 TTL 值时,若还没有被消费,则消息会成为死信。

消息设置 TTL

未使用 SpringBoot 方式

  1. AMQP.BasicProperties properties = new AMQP.BasicProperties()
  2. .builder()
  3. .expiration(TimeUnit.SECONDS.toMillis(10) + "") // 设置消息的最大存活时间,单位毫秒
  4. .build();
  5. channel.basicPublish(NORMAL_EXCHANGE // 交换机
  6. , "routingKey" // routingKey
  7. , properties // 消息参数,即上面的properties对象
  8. , msg.getBytes(StandardCharsets.UTF_8)); // 消息内容

整合 SpringBoot 方式

  1. rabbitTemplate.convertAndSend(RabbitMqConstants.X_NORMAL_EXCHANGE // 交换机
  2. , RabbitMqConstants.X_BINDING_QC // routingKey
  3. , msg.getBytes(StandardCharsets.UTF_8) // 消息内容
  4. , message -> { // 消息后置处理器
  5. // 设置消息过期时间,单位毫秒
  6. message.getMessageProperties().setExpiration(ttl);
  7. return message;
  8. });

队列设置 TTL

未使用 SpringBoot 方式

  1. // arguments 为队列需要的参数
  2. Map<String, Object> arguments = new HashMap<>();
  3. // 设置过期时间,单位毫秒
  4. arguments.put("x-message-ttl", TimeUnit.SECONDS.toMillis(10));
  5. // 声明队列时需要传入参数
  6. channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

整合 SpringBoot 方式

  1. @Bean
  2. public Queue queue() {
  3. return QueueBuilder
  4. .durable(RabbitMqConstants.QUEUE_B) // 队列需要持久化,并设置队列名
  5. .ttl(TimeUnit.SECONDS.toMillis(40)) // 队列的最大存活时间
  6. .build();
  7. }

两种方式的区别

使用队列方式设置 TTL,消息过期之后,则会被队列丢弃(若配置了死信队列,则进入死信队列);但是使用的消息方式设置的 TTL,在消息过期之后可能并不会被立刻丢弃,这是因为消息是在即将投递给消费者的时候才判断是否过期的,那么即使消息过期,但是在这个消息之前的其他消息未被消费或过期,则该消息仍然积压在队列中。

关于这两种方式的区别,在后面实战部分可以更明显的感觉到,如果这里不理解可以待会儿在实战部分理解。

另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

整合 SpringBoot

引入依赖

  1. <!--RabbitMQ 依赖-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <!-- springboot整合web启动器 -->
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-web</artifactId>
  10. </dependency>
  11. <!-- 阿里巴巴的json工具 -->
  12. <dependency>
  13. <groupId>com.alibaba</groupId>
  14. <artifactId>fastjson</artifactId>
  15. <version>1.2.47</version>
  16. </dependency>
  17. <!-- swagger相关依赖[可选] -->
  18. <dependency>
  19. <groupId>io.springfox</groupId>
  20. <artifactId>springfox-swagger2</artifactId>
  21. <version>2.9.2</version>
  22. </dependency>
  23. <!-- swagger相关依赖[可选] -->
  24. <dependency>
  25. <groupId>io.springfox</groupId>
  26. <artifactId>springfox-swagger-ui</artifactId>
  27. <version>2.9.2</version>
  28. </dependency>
  29. <!--RabbitMQ 测试依赖-->
  30. <dependency>
  31. <groupId>org.springframework.amqp</groupId>
  32. <artifactId>spring-rabbit-test</artifactId>
  33. <scope>test</scope>
  34. </dependency>

修改 application.yml 配置文件

  1. spring:
  2. rabbitmq:
  3. host: 192.168.20.132
  4. port: 5672
  5. username: admin
  6. password: 123456

添加 Swagger 配置类[可选]

  1. @Configuration
  2. public class SwaggerConfig {
  3. @Bean
  4. public Docket docket() {
  5. return new Docket(DocumentationType.SWAGGER_2)
  6. .apiInfo(apiInfo())
  7. .groupName("WebApi")
  8. .select()
  9. .build();
  10. }
  11. //配置文档信息
  12. private ApiInfo apiInfo() {
  13. Contact contact = new Contact("留白", "http://www.liubaiblog.top", "123456789@qq.com");
  14. return new ApiInfo(
  15. "RabbitMQ 接口文档", // 标题
  16. "本文档描述了 RabbitMQ 微服务接口定义", // 描述
  17. "v1.0", // 版本
  18. null, // 组织链接
  19. contact, // 联系人信息
  20. null,
  21. null,
  22. new ArrayList<>()// 扩展
  23. );
  24. }
  25. }

SpringBoot 整合实战

前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。

想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面, 成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

进阶功能 - 图9

常量类

根据上面架构图的关系,我们先定义如下常量,方便后续的使用。

  1. public class RabbitMqConstants {
  2. public final static String X_NORMAL_EXCHANGE = "X";
  3. public final static String Y_DEAD_EXCHANGE = "Y";
  4. public final static String QUEUE_A = "QA";
  5. public final static String QUEUE_B = "QB";
  6. public final static String QUEUE_D = "QD";
  7. public final static String X_BINDING_QA = "XA";
  8. public final static String X_BINDING_QB = "XB";
  9. public final static String Y_BINDING_QD = "YD";
  10. public final static int QA_TTL = (int) TimeUnit.SECONDS.toMillis(10);
  11. public final static int QB_TTL = (int) TimeUnit.SECONDS.toMillis(40);
  12. }

配置类

之前声明交换机和队列的工作,我们都是交给消费者执行的,但是 SpringBoot 整合之后,我们可以直接在配置类中声明,不用那么麻烦。

在配置类中:

  • XxxExchange :表示声明一个交换机。
  • Queue :表示声明一个队列,注意包的路径为 org.springframework.amqp.core
  • Binding :声明一个绑定关系。

代码如下:

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import top.liubaiblog.springbootrabbitmq.constant.RabbitMqConstants;
  5. @Configuration
  6. public class RabbitMqConfig {
  7. @Bean // 声明x正常交换机
  8. public DirectExchange xExchange() {
  9. return new DirectExchange(RabbitMqConstants.X_NORMAL_EXCHANGE);
  10. }
  11. @Bean // 声明y死信交换机
  12. public DirectExchange yExchange() {
  13. return new DirectExchange(RabbitMqConstants.Y_DEAD_EXCHANGE);
  14. }
  15. @Bean // 声明队列A,设置延迟10s
  16. public Queue queueA() {
  17. return QueueBuilder
  18. .durable(RabbitMqConstants.QUEUE_A)
  19. .ttl(RabbitMqConstants.QA_TTL)
  20. .deadLetterExchange(RabbitMqConstants.Y_DEAD_EXCHANGE)
  21. .deadLetterRoutingKey(RabbitMqConstants.Y_BINDING_QD)
  22. .build();
  23. }
  24. @Bean // 声明队列B,设置延迟40s
  25. public Queue queueB() {
  26. return QueueBuilder
  27. .durable(RabbitMqConstants.QUEUE_B)
  28. .ttl(RabbitMqConstants.QB_TTL)
  29. .deadLetterExchange(RabbitMqConstants.Y_DEAD_EXCHANGE)
  30. .deadLetterRoutingKey(RabbitMqConstants.Y_BINDING_QD)
  31. .build();
  32. }
  33. @Bean // 声明死信队列D
  34. public Queue queueD() {
  35. return QueueBuilder
  36. .durable(RabbitMqConstants.QUEUE_D)
  37. .build();
  38. }
  39. @Bean // 绑定X交换机和队列A
  40. public Binding xExchangeBindingQa() {
  41. return BindingBuilder
  42. .bind(queueA())
  43. .to(xExchange())
  44. .with(RabbitMqConstants.X_BINDING_QA);
  45. }
  46. @Bean // 绑定X交换机和队列B
  47. public Binding xExchangeBindingQb() {
  48. return BindingBuilder
  49. .bind(queueB())
  50. .to(xExchange())
  51. .with(RabbitMqConstants.X_BINDING_QB);
  52. }
  53. @Bean // 绑定Y死信交换机和死信队列D
  54. public Binding yExchangeBindingQd() {
  55. return BindingBuilder
  56. .bind(queueD())
  57. .to(yExchange())
  58. .with(RabbitMqConstants.Y_BINDING_QD);
  59. }
  60. }

生产者

浏览器访问 http://localhost:8080/ttl/sendMsg/{msg} 接口,即可生产消息。

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("/ttl")
  4. public class SendMsgController {
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. @PostMapping("/sendMsg/{msg}")
  8. public String sendMsg(@PathVariable("msg") String msg) {
  9. log.info("[{}] 发送消息 -> {}", new Date(), msg);
  10. rabbitTemplate.convertAndSend(RabbitMqConstants.X_NORMAL_EXCHANGE // 交换机
  11. , RabbitMqConstants.X_BINDING_QA // routingKey
  12. , msg.getBytes(StandardCharsets.UTF_8)); // 消息
  13. rabbitTemplate.convertAndSend(RabbitMqConstants.X_NORMAL_EXCHANGE
  14. , RabbitMqConstants.X_BINDING_QB
  15. , msg.getBytes(StandardCharsets.UTF_8));
  16. return "success";
  17. }
  18. }

消费者

消费者的方法中需要标识 @RabbitListener 注解,表示这个方法监听某个队列中的消息。

  1. @Slf4j
  2. @Component
  3. public class DeadLetterQueueConsumer {
  4. @RabbitListener(queues = RabbitMqConstants.QUEUE_D)
  5. public void receiveD(Message message, Channel channel) {
  6. String body = new String(message.getBody());
  7. log.info("[{}] 接收到死信队列消息 -> {}", new Date(), body);
  8. }
  9. }

测试

发送请求:http://localhost:8080/ttl/sendMsg/helloworld

进阶功能 - 图10

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列?这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

显然,这样是不太合适的,我们还需要对延迟队列进行优化。

优化

在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间:

进阶功能 - 图11

但是队列不设置 TTL 时间,我们可以将消息设置 TTL 时间,这样我们就可以任意的指定消息什么时候过期。

常量类

  1. public class RabbitMqConstants {
  2. // ...
  3. public final static String QUEUE_C = "QC";
  4. public final static String X_BINDING_QC = "XC";
  5. }

配置类

  1. @Configuration
  2. public class RabbitMqConfig {
  3. // ...
  4. @Bean
  5. public Queue queueC() {
  6. return QueueBuilder
  7. .durable(RabbitMqConstants.QUEUE_C)
  8. .deadLetterExchange(RabbitMqConstants.Y_DEAD_EXCHANGE)
  9. .deadLetterRoutingKey(RabbitMqConstants.Y_BINDING_QD)
  10. .build();
  11. }
  12. @Bean
  13. public Binding xExchangeBindingQc() {
  14. return BindingBuilder
  15. .bind(queueC())
  16. .to(xExchange())
  17. .with(RabbitMqConstants.X_BINDING_QC);
  18. }
  19. }

生产者

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("/ttl")
  4. public class SendMsgController {
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. // ...
  8. @PostMapping("/sendTtlMsg/{msg}/{ttlSeconds}")
  9. public String sendTtlMsg(@PathVariable String msg, @PathVariable Integer ttlSeconds) {
  10. log.info("[{}] 发送消息并延迟{}s -> {}", new Date(), ttlSeconds, msg);
  11. String ttl = TimeUnit.SECONDS.toMillis(ttlSeconds) + "";
  12. rabbitTemplate.convertAndSend(RabbitMqConstants.X_NORMAL_EXCHANGE // 交换机
  13. , RabbitMqConstants.X_BINDING_QC // routingKey
  14. , msg.getBytes(StandardCharsets.UTF_8) // 消息
  15. , message -> { // 消息后置处理器
  16. // 设置消息过期时间
  17. message.getMessageProperties().setExpiration(ttl);
  18. return message;
  19. });
  20. return "success";
  21. }
  22. }

问题

依次发送请求:

  • http://localhost:8080/ttl/sendTtlMsg/hello1/20
  • http://localhost:8080/ttl/sendTtlMsg/hello2/2

我的本意是想要 hello1 消息在延时 20s 后被消费,hello2 消息在延时 2s 后被消费,但是看下图结果,就会发现 hello2 依然延迟了 20s 才被消费。

进阶功能 - 图12

这就是出现的问题,还记得我们在消息延迟小节说队列 TTL 和消息 TTL 的区别吗,我们使用消息 TTL 之后,消息只有在即将投递的时候才会判断是否过期,但是在 hello2 消息之前,我们还有 hello1 消息没有被消费,所以导致 hello2 消息即不会被消费,更不会被判断是否过期,从而积压在队列中。

等到 20s 后,hello1 消息过期,被投递到死信队列,然后才到 hello2hello2 再被判定为过期,也转入死信队列,从而出现上图的结果。

但是很明显上图的结果不是我们想要的,我们想要的结果是 hello2 2s 后先被消费,然后 hello1 20s 后再被消费。

RabbitMq 插件实现延迟队列

为了解决上面的问题,我们可以使用 RabbitMq 为我们提供的插件。RabbitMq 支持很多插件,我们可以访问其官网获取,我们这次需要的是 rabbitmq_delayed_message_exchange 插件,访问下面地址可以直接跳转到插件的 GitHub 下载。

获取插件

插件地址https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases。

进阶功能 - 图13

在 Linux 中,进入如下目录,并将 ez 文件放入到此目录下:

  1. $ cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.1/plugins/

执行如下命令安装插件

  1. $ rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如果 Web 管理界面后,有如下效果表示安装成功,如果没有出现可以尝试重启服务。

进阶功能 - 图14

代码架构图

使用了延迟插件之后,我们可以选择 x-delayed-message 类型的交换机,这个方式和之前的很不同,在之前,我们其实是基于的死信队列实现的延迟队列,是等待消息过期之后进入死信队列后,我们再消费死信队列的消息。

但是使用延迟插件后,我们可以不借助死信队列,消息的延迟在交换机位置,消息会在交换机延迟到指定时间后,再转发给队列,然后消费者可以直接消费队列中的消息。

进阶功能 - 图15

在这里新增了一个队列 delayed.queue ,一个自定义交换机 delayed.exchange ,我们下面根据上图编写代码。

常量类

  1. public class RabbitMqConstants {
  2. public final static String DELAY_EXCHANGE = "delayed.exchange";
  3. public final static String DELAY_QUEUE = "delayed.queue";
  4. public final static String DELAY_EXCHANGE_BINDING_DELAY_QUEUE = "delayed.routingKey";
  5. }

配置类

注意,由于这里的交换机不是之前的几种常见类型的交换机,所以在声明交换机的时候,我们需要使用 CustomExchange 类,表示这是一个自定义交换机。

  1. @Configuration
  2. public class DelayRabbitMqConfig {
  3. @Bean // 声明自定义交换机
  4. public CustomExchange delayExchange() {
  5. Map<String, Object> arguments = new HashMap<>(3);
  6. arguments.put("x-delayed-type", "direct");
  7. return new CustomExchange(RabbitMqConstants.DELAY_EXCHANGE // 交换机名称
  8. , "x-delayed-message" // 自定义消息类型
  9. , true // 是否持久化
  10. , false // 是否自动删除
  11. , arguments); // 参数列表
  12. }
  13. @Bean // 声明延迟队列
  14. public Queue delayQueue() {
  15. return QueueBuilder
  16. .durable(RabbitMqConstants.DELAY_QUEUE)
  17. .build();
  18. }
  19. @Bean // 声明routingKey
  20. public Binding delayExchangeBindingDelayQueue() {
  21. return BindingBuilder
  22. .bind(delayQueue())
  23. .to(delayExchange())
  24. .with(RabbitMqConstants.DELAY_EXCHANGE_BINDING_DELAY_QUEUE)
  25. .noargs();
  26. }
  27. }

生产者

生产者的代码和之前基本一致,但是我们这里使用另一种方式发送,这种方式的效果和之前是一致的。但是在 MessageProperties 中,不是使用 setExpiration() 方法,而是使用 setDelay() 方法表示设置延迟时间。

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("/ttl")
  4. public class SendMsgController {
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. @PostMapping("/sendDelayMsg/{msg}/{ttlSeconds}")
  8. public String sendDelayMsg(@PathVariable String msg, @PathVariable Integer ttlSeconds) {
  9. log.info("[{}] 发送自定义延迟队列消息并延迟{}s -> {}", new Date(), ttlSeconds, msg);
  10. int ttl = (int) TimeUnit.SECONDS.toMillis(ttlSeconds);
  11. MessageProperties messageProperties = new MessageProperties();
  12. messageProperties.setDelay(ttl);
  13. Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
  14. rabbitTemplate.send(RabbitMqConstants.DELAY_EXCHANGE
  15. , RabbitMqConstants.DELAY_EXCHANGE_BINDING_DELAY_QUEUE
  16. , message);
  17. return "success";
  18. }
  19. }

消费者

  1. @Slf4j
  2. @Component
  3. public class DeadLetterQueueConsumer {
  4. @RabbitListener(queues = RabbitMqConstants.DELAY_QUEUE)
  5. public void receiveDelayQueue(Message message) {
  6. String body = new String(message.getBody());
  7. log.info("[{}] 接收到自定义延迟队列消息 -> {}", new Date(), body);
  8. }
  9. }

测试

依次发送请求:

  • http://localhost:8080/ttl/sendDelayMsg/hello1/20
  • http://localhost:8080/ttl/sendDelayMsg/hello2/2

进阶功能 - 图16

从这次的结果来看,是符合我们的预期的,我们是希望 hello2 消息延迟 2s 后就交给消费者消费,结果也确实是这样。

阶段总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。

另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。

发布确认高级

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

为了方便,我们还是使用 SpringBoot 的方式对 RabbitMq 整合。

发布确认

代码架构图

进阶功能 - 图17

application.yml 配置文件

除了 rabbitMq 地址相关的信息外,我们还需要在配置文件中添加如下内容:

  1. spring:
  2. rabbitmq:
  3. publisher-confirm-type: correlated

publisher-confirm-type 一共有三个值:

  • NONE :禁用发布确认模式,是默认值。
  • CORRELATED :发布消息成功到交换器后会触发回调方法。
  • SIMPLE :经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法;其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms()waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

常量类

  1. public class RabbitMqConstants {
  2. public final static String CONFIRM_EXCHANGE = "confirm.exchange";
  3. public final static String CONFIRM_QUEUE = "confirm.queue";
  4. public final static String EXCHANGE_BINDING_QUEUE = "confirm.eq";
  5. }

配置类

  1. @Configuration
  2. public class RabbitMqConfig {
  3. @Bean // 声明交换机
  4. public DirectExchange confirmExchange() {
  5. return new DirectExchange(RabbitMqConstants.CONFIRM_EXCHANGE);
  6. }
  7. @Bean // 声明队列
  8. public Queue confirmQueue() {
  9. return QueueBuilder
  10. .durable(RabbitMqConstants.CONFIRM_QUEUE)
  11. .build();
  12. }
  13. @Bean // 声明绑定关系
  14. public Binding binding() {
  15. return BindingBuilder
  16. .bind(confirmQueue())
  17. .to(confirmExchange())
  18. .with(RabbitMqConstants.EXCHANGE_BINDING_QUEUE);
  19. }
  20. }

生产者

在生产者这里的代码需要有些不同,之前的方法中我们是没有指定消息的ID的,但是其实我们生产的消息应该携带一个消息ID的,携带这个消息ID我们可以通过创建一个 CorrelationData 类实现,如下。

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("/producer")
  4. public class ProducerController {
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. // 请求这个接口,我们发送两个消息,一个给存在的交换机,一个给不存在的交换机
  8. @PostMapping("/send/{msg}")
  9. public String sendMsg(@PathVariable String msg) {
  10. // 给存在的交换机发送消息
  11. rabbitTemplate.convertAndSend(RabbitMqConstants.CONFIRM_EXCHANGE
  12. , RabbitMqConstants.EXCHANGE_BINDING_QUEUE
  13. , msg.getBytes(StandardCharsets.UTF_8)
  14. , new CorrelationData("1"));
  15. // 给不存在的交换机发送消息
  16. rabbitTemplate.convertAndSend(RabbitMqConstants.CONFIRM_EXCHANGE + "abcd"
  17. , RabbitMqConstants.EXCHANGE_BINDING_QUEUE
  18. , msg.getBytes(StandardCharsets.UTF_8)
  19. , new CorrelationData("2"));
  20. log.info("发送了消息 -> {}", msg);
  21. return "success";
  22. }
  23. }

消费者

  1. @Slf4j
  2. @Component
  3. public class ConsumerListener {
  4. @RabbitListener(queues = RabbitMqConstants.CONFIRM_QUEUE)
  5. public void confirm(Message message) throws IOException {
  6. String body = new String(message.getBody());
  7. log.info("消费者接收到消息 -> {}", body);
  8. }
  9. }

消息确认的回调接口

我们可以定义一个用于消息确认的回调接口,这个回调接口不管在消息接收到还是没有接收到都会被触发,通过 ack 属性可以判断消息是否被接收到。

这个回调接口要求继承 RabbitTemplate.ConfirmCallback 接口,这个是个函数式接口,我们也可以不单独定义类,而是使用 Lamdba 表达式,但是本例还是使用声明一个新类的方式。

最后我们需要把我们声明的回调接口,注入 rabbitTemplate 当中,这样才能生效。

  1. @Component
  2. @Slf4j
  3. public class MessageCallback implements RabbitTemplate.ConfirmCallback {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. // 初始化方法,在构造器和rabbitTemplate注入之后执行
  7. @PostConstruct
  8. private void init() {
  9. // 将本类注入到rabbitTemplate
  10. rabbitTemplate.setConfirmCallback(this);
  11. }
  12. /**
  13. * 消息确认
  14. *
  15. * @param correlationData 相关数据,这个需要生产者给出
  16. * @param ack 是否应答
  17. * @param errorCause 错误原因
  18. */
  19. @Override
  20. public void confirm(CorrelationData correlationData, boolean ack, String errorCause) {
  21. String id = correlationData != null ? correlationData.getId() : "";
  22. if (ack) {
  23. log.info("交换机收到id为{}的消息", id);
  24. } else {
  25. log.info("交换机未收到id为{}的消息,由于原因 -> {}", id, errorCause);
  26. }
  27. }
  28. }

结果

访问地址 http://localhost:8081/producer/send/hello ,会发送两个消息,一个 ID 为 1,一个 ID 为 2,很明显,由于我们在生产者端声明了一个不存在的交换机,所以 ID 为 2 的消息必然会收不到。

进阶功能 - 图18

但是上面的效果就已经够了吗?其实我们可以尝试一下生产者端向一个不存在的 routingKey 发送消息,这个时候,交换机无法通过 routingKey 找到对应的队列投递消息,就会把消息丢弃,但是在回调接口中,并不会体现出来,但实际上消费者只接受到了一个消息。

进阶功能 - 图19

回退消息

Mandatory 参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

开启这个参数的方式有两种,一种是在配置文件中开启,一种是在 rabbitTemplate 中设置此参数为 true

配置文件

  1. spring:
  2. rabbitmq:
  3. publisher-confirm-type: correlated
  4. publisher-returns: true

rabbitTemplate

  1. rabbitTemplate.setMandatory(true);
  2. rabbitTemplate.setReturnsCallback(<消息回退的接口>);

生产者

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("/producer")
  4. public class ProducerController {
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. // 请求这个接口,我们发送两个消息,一个给存在的交换机,一个给不存在的交换机
  8. @PostMapping("/send/{msg}")
  9. public String sendMsg(@PathVariable String msg) {
  10. // 给存在的交换机发送消息
  11. rabbitTemplate.convertAndSend(RabbitMqConstants.CONFIRM_EXCHANGE
  12. , RabbitMqConstants.EXCHANGE_BINDING_QUEUE
  13. , msg.getBytes(StandardCharsets.UTF_8)
  14. , new CorrelationData("1"));
  15. // 给不存在的交换机发送消息
  16. rabbitTemplate.convertAndSend(RabbitMqConstants.CONFIRM_EXCHANGE
  17. , RabbitMqConstants.EXCHANGE_BINDING_QUEUE + "dada"
  18. , msg.getBytes(StandardCharsets.UTF_8)
  19. , new CorrelationData("2"));
  20. log.info("发送了消息 -> {}", msg);
  21. return "success";
  22. }
  23. }

消息回退接口

  1. @Component
  2. @Slf4j
  3. public class MessageCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. // 初始化方法,在构造器和rabbitTemplate注入之后执行
  7. @PostConstruct
  8. private void init() {
  9. // 将本类注入到rabbitTemplate
  10. rabbitTemplate.setConfirmCallback(this);
  11. // 如果在配置文件中配置类Mandatory参数,这里可以不设置
  12. // rabbitTemplate.setMandatory(true);
  13. rabbitTemplate.setReturnsCallback(this);
  14. }
  15. /**
  16. * 消息确认
  17. *
  18. * @param correlationData 相关数据,这个需要生产者给出
  19. * @param ack 是否应答
  20. * @param errorCause 错误原因
  21. */
  22. @Override
  23. public void confirm(CorrelationData correlationData, boolean ack, String errorCause) {
  24. String id = correlationData != null ? correlationData.getId() : "";
  25. if (ack) {
  26. log.info("交换机收到id为{}的消息", id);
  27. } else {
  28. log.info("交换机未收到id为{}的消息,由于原因 -> {}", id, errorCause);
  29. }
  30. }
  31. /**
  32. * 处理被回退的消息
  33. *
  34. * @param returnedMessage 被回退的消息
  35. */
  36. @Override
  37. public void returnedMessage(ReturnedMessage returnedMessage) {
  38. String msg = new String(returnedMessage.getMessage().getBody());
  39. String cause = returnedMessage.getReplyText();
  40. String routingKey = returnedMessage.getRoutingKey();
  41. log.error("消息 [{}] 被交换机退回,退回原因:{},路由key为{}", msg, cause, routingKey);
  42. }
  43. }

结果

访问地址 http://localhost:8081/producer/send/hello ,生产者发布了向不存在的 routingKey 发送消息,消息会被退回。

进阶功能 - 图20

备份交换机

设置 Mandatory 参数后,可以实现回退消息,但是还有一个弊端就是对回退的消息,我们应该怎么处理呢?我们可以使用日志打印,但其实使用日志的方式处理是一种很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。

如果既不想丢失消息,也不想增加生产者的复杂性,我们就可以考虑使用备份交换机,备份交换机就相当于主交换机的备胎,当主交换机遇到无法路由的消息的时候,就会交给备份交换机处理,备份交换机通常是 fanout 类型,可以把收到的消息发布给所有绑定的队列,所以我们也可以创建多个消费者,对这个异常的消息进行处理。

比如我们可以建立一个报警队列,用独立的消费者进行监测和报警。

代码架构图

进阶功能 - 图21

常量类

  1. public class RabbitMqConstants {
  2. // ...
  3. public final static String BACKUP_EXCHANGE = "backup.exchange";
  4. public final static String BACKUP_QUEUE = "backup.queue";
  5. public final static String WARNING_QUEUE = "warning.queue";
  6. // ...
  7. }

配置类

  1. @Configuration
  2. public class RabbitMqConfig {
  3. @Bean //! 声明交换机
  4. public DirectExchange confirmExchange() {
  5. return ExchangeBuilder
  6. .directExchange(RabbitMqConstants.CONFIRM_EXCHANGE) // 直接交换机
  7. .durable(true) // 是否持久化
  8. .alternate(RabbitMqConstants.BACKUP_EXCHANGE) // 备份交换机的名字
  9. .build();
  10. }
  11. @Bean // 声明队列
  12. public Queue confirmQueue() {
  13. return QueueBuilder
  14. .durable(RabbitMqConstants.CONFIRM_QUEUE)
  15. .build();
  16. }
  17. @Bean // 声明绑定关系
  18. public Binding binding() {
  19. return BindingBuilder
  20. .bind(confirmQueue())
  21. .to(confirmExchange())
  22. .with(RabbitMqConstants.EXCHANGE_BINDING_QUEUE);
  23. }
  24. @Bean // 声明备份交换机,备份交换机采用广播方式发送消息,类型为fanout
  25. public FanoutExchange backupExchange() {
  26. return new FanoutExchange(RabbitMqConstants.BACKUP_EXCHANGE);
  27. }
  28. @Bean // 备份队列
  29. public Queue backupQueue() {
  30. return QueueBuilder
  31. .durable(RabbitMqConstants.BACKUP_QUEUE)
  32. .build();
  33. }
  34. @Bean // 备份报警队列
  35. public Queue warningQueue() {
  36. return QueueBuilder
  37. .durable(RabbitMqConstants.WARNING_QUEUE)
  38. .build();
  39. }
  40. @Bean // 绑定备份交换机和备份队列
  41. public Binding backupExchangeBindingBackupQueue() {
  42. return BindingBuilder
  43. .bind(backupQueue())
  44. .to(backupExchange());
  45. }
  46. @Bean // 绑定备份交换机和报警队列
  47. public Binding backupExchangeBindingWarningQueue() {
  48. return BindingBuilder
  49. .bind(warningQueue())
  50. .to(backupExchange());
  51. }
  52. }

消费者

  1. @Slf4j
  2. @Component
  3. public class ConsumerListener {
  4. // 正常消费者
  5. @RabbitListener(queues = RabbitMqConstants.CONFIRM_QUEUE)
  6. public void confirm(Message message) throws IOException {
  7. String body = new String(message.getBody());
  8. log.info("消费者接收到消息 -> {}", body);
  9. }
  10. // 备份的消费者
  11. @RabbitListener(queues = RabbitMqConstants.BACKUP_QUEUE)
  12. public void backup(Message message) {
  13. String body = new String(message.getBody());
  14. log.info("备份消费者接收到消息 -> {}", body);
  15. }
  16. // 报警的消费者
  17. @RabbitListener(queues = RabbitMqConstants.WARNING_QUEUE)
  18. public void warning(Message message) {
  19. String body = new String(message.getBody());
  20. log.error("警告 -> 消息 [{}] 未被正常消费", body);
  21. }
  22. }

结果

Tip:在开始测试之前,需要把原来的 confirm.exchange 交换机删除,因为在配置类中我们添加了新的配置,不然程序会报错!

访问地址 http://localhost:8081/producer/send/hello ,如果出现不可路由的消息,交给交给备份交换机处理。

进阶功能 - 图22

mandatory 参数和备份交换机一起使用的时候,默认备份交换机的优先级更高!

其他相关知识点

幂等性

介绍

每个队列的消息,都应该保证只被消费者消费一次,但是在某些情况下,消息可能会被多次消费。

比如:

  • 消费者在消费完之后就挂掉了,但是还没有给 MQ 回复 ACK 消息,MQ 并不知道消费者已经挂掉了,等消费者恢复后,消息还会投递原消息给这个消费者,造成重复消费。
  • 生产者在发布确认模式下,当生产者向 MQ 发布消息后,会等待 MQ 的确认,但是在这个时候网络延迟较高,那么生产者可能会重新发布这个消息,那么 MQ 就收到了两条相同的消息供消费者消费。

所以我们必须要保证同一个消息只会被消费一次,这就是幂等性问题。

解决思路

生产者每次发送消息之前,先为每条消息设置一个唯一ID,消费者拿到这个消息后,在消费之前,需要先进行判断,判断这个消息是否被消费,如果这个消息被消费了就不再进行消费。

所以问题就是怎么进行判断,比较常见的方式是消费后,将消息的唯一ID存到 SQL 数据库中 NoSQL 数据库中。

使用 SQL 数据库如 MySQL 的劣势就是在高并发时,如果是单个数 据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

更常见的方式是使用 Redis 数据库,这种数据库效率比较高,而且有 setnx 这类命令,可以直接帮助我们判断消息是否被消费过;或者也可以使用 Set 集合保存消费过的唯一ID。

实战

本例中,我们使用 redis 的方式演示,关于在 SpringBoot 中如何整合 redis,在我之前的文档中已经演示过,下面就直接开始演示。

对于在 SpringBoot 中如何声明交换机和队列并绑定,配置类已经在前面写过很多遍,这里也不再演示,我们主要聚焦生产者和消费者的代码,主要是消费者,在消费者中,我们需要保证消息仅被消费一次。

生产者

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("/producer")
  4. public class ProducerController {
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. // 请求这个接口,我们发送两个消息,一个给存在的交换机,一个给不存在的交换机
  8. @PostMapping("/send/{msg}")
  9. public String sendMsg(@PathVariable String msg) {
  10. // 生成消息
  11. Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8))
  12. .setMessageId(UUID.randomUUID().toString())
  13. .build();
  14. // 发送消息
  15. rabbitTemplate.send(RabbitMqConstants.CONFIRM_EXCHANGE
  16. , RabbitMqConstants.EXCHANGE_BINDING_QUEUE
  17. , message);
  18. return "success";
  19. }
  20. }

消费者

Tip:在 SpringBoot 中,默认是 @RabbitListener 注解的方法结束后自动回复 ACK 给 MQ。

  1. @Slf4j
  2. @Component
  3. public class ConsumerListener {
  4. @Autowired
  5. private RedisUtils redisUtils;
  6. @RabbitListener(queues = RabbitMqConstants.CONFIRM_QUEUE)
  7. public void confirm(Message message) throws Exception {
  8. String messageId = message.getMessageProperties().getMessageId();
  9. // 从redis中判断消息是否已经被消费
  10. boolean exists = redisUtils.setIsMember("rabbitMsg", messageId);
  11. if (exists) {
  12. log.warn("检测到重复消费的消息,消息ID -> {}", messageId);
  13. return;
  14. }
  15. // 消费消息
  16. String body = new String(message.getBody());
  17. log.info("消费者接收到消息ID为 [{}] 的消息 -> {}", messageId, body);
  18. // 将已经消费的消息ID保存到set集合
  19. redisUtils.setAdd("rabbitMsg", messageId);
  20. // 模拟长网络延迟
  21. TimeUnit.SECONDS.sleep(20);
  22. }
  23. }

测试

为了演示出效果,我们先访问 http://localhost:8081/producer/send/hello1 地址向服务器发送 hello1 消息,在消息消费完之后,上面的代码中应该模拟了 20s 的网络延迟,在这个期间,我们直接强制关闭 SpringBoot 服务器。

然后我们再重启服务器,这时候,由于还没有恢复 ACK 消息,RabbitMQ 会认为消费者没有消费此消息,再次投递此消息给消费者,但是我们通过 redis 的判断,就避免了消息被重复消费的问题。

进阶功能 - 图23

优先队列

介绍

优先队列就是具有优先级的队列,消息被消费的顺序根据消息的优先级决定,优先级高的消息先被消费。

假设我们系统有一个订单催付的功能,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,功能似乎并不复杂。

但是这个功能中,肯定有大客户小客户,对于大客户来说,可以对我们提供较大的利润,自然需要优先对待,而小客户迟一些催付也无妨,在这种情况下,我们就可以使用优先队列的功能。

实战

常量类

  1. public class RabbitMqConstants {
  2. public final static String PRIORITY_EXCHANGE = "priority_exchange";
  3. public final static String PRIORITY_QUEUE = "priority_queue";
  4. public final static String PRIORITY_EXCHANGE_BINDING_PRIORITY_QUEUE = "priority_eq";
  5. }

配置类

在声明队列时,我们应该设置优先级的最高值,不建议设置太高,不然会导致系统负载太大。

  1. @Configuration
  2. public class PriorityQueueConfig {
  3. @Bean // 声明交换机
  4. public DirectExchange priorityExchange() {
  5. return new DirectExchange(RabbitMqConstants.PRIORITY_EXCHANGE);
  6. }
  7. @Bean // 声明优先队列
  8. public Queue priorityQueue() {
  9. return QueueBuilder
  10. .nonDurable(RabbitMqConstants.PRIORITY_QUEUE)
  11. .maxPriority(10) //! 优先队列的最高值
  12. .build();
  13. }
  14. @Bean // 声明绑定关系
  15. public Binding priorityBinding() {
  16. return BindingBuilder
  17. .bind(priorityQueue())
  18. .to(priorityExchange())
  19. .with(RabbitMqConstants.PRIORITY_EXCHANGE_BINDING_PRIORITY_QUEUE);
  20. }
  21. }

生产者

在生产者中,发送的消息应该带有优先级,如果没有设置消息的优先级,则消息的优先级默认为0。

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("/producer")
  4. public class ProducerController {
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. @PostMapping("/send/priority/{msg}/{priority}")
  8. public String sendMsgPriority(@PathVariable String msg, @PathVariable Integer priority) {
  9. // 生成消息
  10. Message message = MessageBuilder
  11. .withBody(msg.getBytes(StandardCharsets.UTF_8))
  12. .setPriority(priority)
  13. .build();
  14. // 发送消息
  15. rabbitTemplate.send(RabbitMqConstants.PRIORITY_EXCHANGE
  16. , RabbitMqConstants.PRIORITY_EXCHANGE_BINDING_PRIORITY_QUEUE
  17. , message);
  18. return "success";
  19. }
  20. }

消费者

  1. @Slf4j
  2. @Component
  3. public class ConsumerListener {
  4. @RabbitListener(queues = RabbitMqConstants.PRIORITY_QUEUE)
  5. public void consume(Message message) {
  6. String body = new String(message.getBody());
  7. log.info("消费者接收到消息 -> {}", body);
  8. }
  9. }

测试

Tip:在测试之前建议先注释消费者部分代码,因为如果开启消费者监听器,则发布一条消息就会被立即消费,所以建议先关闭。

依次访问如下地址:

  • /producer/send/priority/hello/5
  • /producer/send/priority/java/2
  • /producer/send/priority/world/7
  • /producer/send/priority/python/4

访问了这些地址之后,应该会向队列中推送四条消息,并有不同的优先级,预期优先级顺序是 world -> hello -> python -> java 。我们这时在取消消费者部分代码的注释,重新启动服务,可以看到控制台打印如下信息:

进阶功能 - 图24

也确实符合我们的预期。

惰性队列

介绍

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持 更多的消息存储。

当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。

两种模式

RabbitMQ 中队列具备两种模式:

  • default
  • lazy

默认使用的是 default 模式,在 3.6 之前的版本无需做任何变更;lazy 即为惰性队列的模式。

原生方式声明惰性队列

  1. Map<String, Object> args = new HashMap<String, Object>();
  2. args.put("x-queue-mode", "lazy");
  3. channel.queueDeclare("testQueue", false, false, false, args);

SpringBoot 整合后声明惰性队列

  1. @Bean
  2. public Queue testQueue() {
  3. return QueueBuilder
  4. .nonDurable("testQueue")
  5. .lazy()
  6. .build();
  7. }

内存开销对比

进阶功能 - 图25

在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB。

集群

介绍

在前面使用的 RabbitMQ 都是在一台服务器上的,但是在真实的环境中,可能不能满足我们的需求。因为单台的服务器的服务是非常不安全的,一旦出现内存崩溃、服务器宕机等一系列问题,可能直接会导致无法传递消息,而出现这种问题对于应用程序来说可能是致命的。

为了避免出现上述的问题,我们可以将多台 RabbitMQ 服务器搭建成一个集群,在一个集群中,其中一个节点宕机,并不会影响其他节点的使用,可以很大程度上提高安全性。

并且搭建 RabbitMQ 集群可能很大程度上提高消息的吞吐量,在高吞吐量的场景中,搭建集群无疑也是一个很不错的选择。

搭建步骤

我们使用克隆的方式,先准备三台虚拟机,可以选择链接克隆,也可以完整克隆。

进阶功能 - 图26

克隆完成之后,启动这三台虚拟机。

第一步:修改虚拟机 IP 地址

修改三台主机的 IP 分别为 192.168.20.10192.168.20.20192.168.20.30 ,当然,这个 IP 地址需要根据自己虚拟网卡的地址进行调整。

  1. $ cd /etc/sysconfig/network-scripts/
  2. $ vim ifcfg-ens33

文件内容如下:

  1. TYPE=Ethernet
  2. PROXY_METHOD=none
  3. BROWSER_ONLY=no
  4. BOOTPROTO=static
  5. IPADDR=192.168.20.10
  6. NETMASK=255.255.255.0
  7. GATEWAY=192.168.20.2
  8. DNS1=114.114.114.114
  9. DEFROUTE=yes
  10. IPV4_FAILURE_FATAL=no
  11. NAME=ens33
  12. UUID=b7310b0a-1fa8-4a93-b224-5b058417fe07
  13. DEVICE=ens33
  14. ONBOOT=yes

重启网络服务

  1. $ systemctl restart network

第二步:修改三台机器的主机名

分别将三台主机的主机名修改为 node1node2node3

  1. $ vim /etc/hostname
  2. # 修改文件的内容为节点名称
  3. node1
  4. $ reboot # 重启主机方可应用

第三步:配置各个结点的 hosts 文件

配置 hosts 文件是为了三台主机之间可以直接通过主机名互相访问。

  1. $ vim /etc/hosts
  2. # 三个主机的host文件都填入如下内容
  3. 192.168.20.10 node1
  4. 192.168.20.20 node2
  5. 192.168.20.30 node3

修改完后,使用 ping 命令确保连通性后,再继续后面步骤。

进阶功能 - 图27

第四步:确保各个结点的 cookie 文件使用的是同一个值

在 node1 上执行如下远程操作命令即可

  1. $ scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
  2. $ scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie

执行命令期间,如果询问是否继续,输入 yes ;要求输入密码,按要求输入密码即可。

第五步:启动 RabbitMQ 服务

在三台结点上分别输入如下命令

  1. $ rabbitmq-server -detached

同时这条命令顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务。

第六步:关闭子结点 RabbitMQ 服务,并加入集群

在 node2 中输入:

  1. # rabbitmqctl stop 会将 Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务
  2. $ rabbitmqctl stop_app
  3. $ rabbitmqctl reset
  4. $ rabbitmqctl join_cluster rabbit@node1
  5. $ rabbitmqctl start_app # 只启动应用服务

在 node3 中输入:

  1. $ rabbitmqctl stop_app
  2. $ rabbitmqctl reset
  3. $ rabbitmqctl join_cluster rabbit@node2
  4. $ rabbitmqctl start_app

查看集群状态

  1. $ rabbitmqctl cluster_status

如下图所示表示成功

进阶功能 - 图28

第七步:重新设置用户

  1. # 创建账号
  2. $ rabbitmqctl add_user admin 123456
  3. # 设置用户角色
  4. $ rabbitmqctl set_user_tags admin administrator
  5. # 设置用户权限
  6. $ rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

后续操作:解除集群节点

以解除 node2 节点为例子,在 node2 节点中输入:

  1. $ rabbitmqctl stop_app
  2. $ rabbitmqctl reset
  3. $ rabbitmqctl start_app
  4. $ rabbitmqctl cluster_status

最后在 node1 中输入:

  1. $ rabbitmqctl forget_cluster_node rabbit@node2

搭建成功

如果搭建成功,访问 http://192.168.20.20:15672/ 即可进入 RabbitMQ 集群的 Web 管理界面,进入首页后应该可以看到如下三台节点。

进阶功能 - 图29

镜像队列

介绍

默认情况下,RabbitMQ 搭建的集群之间的队列并不会共享,也就是我们在 node1 上创建的队列在 node2 中并不存在,这就会存在一个问题,如果 node1 突然宕机了,那么原本在 node1 上的所有队列也都失效了,如果要队列恢复必须等到 node1 恢复,并且如果队列没有持久化还存在数据丢失的风险。

为了避免上述的问题,就必须创建镜像队列,镜像队列可以将 node1 上的队列拷贝到另一个节点中,这样如果 node1 失效了,其他节点还可以照常提供服务,以保障服务的高可用性。

搭建方式

按照之前的方式搭建好集群后,我们可以进入任意一个节点的 Web 管理页面,然后根据下图方式,即可搭建镜像队列的策略。

进阶功能 - 图30

策略参数说明

  • Name:policy的名称。
  • Pattern:queue的匹配模式(正则表达式)。
  • Definitio:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode。
  • ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes。

    • all:表⽰在集群中所有的节点上进⾏镜像(并不推荐)。
    • exactly:表⽰在指定个数的节点上进⾏镜像,节点的个数由ha-params指定。
    • nodes:表⽰在指定的节点上进⾏镜像,节点名称通过ha-params指定。
  • ha-params:ha-mode模式需要⽤到的参数。
  • ha-sync-mode:进⾏队列中消息的同步⽅式,有效值为automatic和manual。
  • priority:可选参数,policy的优先级。

请注意⼀个事实,镜像配置的 pattern 采⽤的是正则表达式匹配,也就是说会匹配⼀组。

测试

在开启镜像策略后,我们添加的以 mirror 开头的队列,应该都类似如下:

进阶功能 - 图31

这时候,当 node1 宕机,我们可以在 node1 输入 rabbitmqctl stop_app 关闭服务,那么 RabbitMQ 会自动切换到 node2 节点,并自动在 node3 节点中再镜像一份,保证队列的可用性。

进阶功能 - 图32

注意,这时候即使 node1 节点恢复,这个队列也还是在 node2 节点中,并不会恢复到 node1 节点内。

通过镜像队列,可以保证即使集群中仅剩下一个节点了,依然能够保证队列的可用性。

Haproxy 实现负载均衡

问题

搭建完集群和镜像队列之后,还有一个问题就是我们的客户端,每次只能与其中一个节点建立连接,如下:

  1. ConnectionFactory factory = new ConnectionFactory();
  2. // RabbitMQ所在服务器的地址
  3. factory.setHost("192.168.20.20");
  4. // 端口号,默认也是5672
  5. factory.setPort(5672);

这样即使搭建了集群,一旦某个节点宕机了,还是会导致连接中断,从而影响到应用程序,这显然也不是我们所需要的,那怎么解决这个问题呢?

我们可以在客户端和服务器之间多加一层作为网关,这个网关负责去找具体的某个 RabbitMQ 服务,而作为客户端,不需要关注具体的 RabbitMQ 服务器的地址,只需要直到网关的地址,通过网关就可以找到 RabbitMQ 服务器。

网关中可以设置多个 RabbitMQ 服务器,当客户端过来的时候,就可以将其连接到某个具体的 RabbitMQ 服务器中,从而实现负载均衡的作用。

介绍

HAProxy 提供高可用性、负载均衡及基于 TCPHTTP 应用的代理,支持虚拟主机,它是免费、快速并 且可靠的一种解决方案,包括 Twitter、Reddit、StackOverflow、GitHub 在内的多家知名互联网公司在使用。 HAProxy 实现了一种事件驱动、单一进程模型,此模型支持非常大的井发连接数。

搭建步骤

Tip:以下内容均在 node1 节点中搭建,我们也可以新建一台虚拟机,作为网关。

第一步:下载 haproxy

  1. $ yum -y install haproxy

第二步:修改 haproxy.cfg 配置文件

  1. $ vim /etc/haproxy/haproxy.cfg

文件内容如下:

  1. # logging options
  2. global
  3. log 127.0.0.1 local0 info
  4. maxconn 5120
  5. # haproxy 的安装地址
  6. chroot /var/lib/haproxy
  7. pidfile /var/run/haproxy.pid
  8. uid 99
  9. gid 99
  10. daemon
  11. quiet
  12. nbproc 20
  13. defaults
  14. log global
  15. # 使用4层代理模式,”mode http”为7层代理模式
  16. mode tcp
  17. # if you set mode to tcp,then you nust change tcplog into httplog
  18. option tcplog
  19. option dontlognull
  20. retries 3
  21. option redispatch
  22. maxconn 2000
  23. contimeout 5s
  24. # 客户端空闲超时时间为 30秒 则HA 发起重连机制
  25. clitimeout 30s
  26. # 服务器端链接超时时间为 15秒 则HA 发起重连机制
  27. srvtimeout 15s
  28. #front-end IP for consumers and producters
  29. listen rabbitmq_cluster
  30. #! 客户端访问的端口
  31. bind 0.0.0.0:5555
  32. # 配置TCP模式
  33. mode tcp
  34. # 简单的轮询
  35. balance roundrobin
  36. # rabbitmq集群节点配置
  37. #inter 每隔五秒对mq集群做健康检查, 2次正确证明服务器可用,2次失败证明服务器不可用,并且配置主备机制
  38. server node1 192.168.20.10:5672 check inter 5000 rise 2 fall 2
  39. server node2 192.168.20.20:5672 check inter 5000 rise 2 fall 2
  40. server node3 192.168.20.30:5672 check inter 5000 rise 2 fall 2
  41. # 配置haproxy web监控,查看统计信息
  42. # 表示可以访问 http://192.168.20.10:8100/rabbitmq-stats
  43. listen stats
  44. # 网关的地址
  45. bind 192.168.20.10:8100
  46. mode http
  47. option httplog
  48. stats enable
  49. stats uri /rabbitmq-stats
  50. stats refresh 5s

第三步:启动 haproxy

  1. $ haproxy -f /etc/haproxy/haproxy.cfg

测试

  1. # 查看进程
  2. $ ps -ef | grep haproxy
  3. nobody 12068 1 0 22:07 ? 00:00:00 haproxy -f /etc/haproxy/haproxy.cfg

访问 http://192.168.20.10:8100/rabbitmq-stats 查看 Web 界面。

进阶功能 - 图33

如果上面一切正常,说明搭建成功。

之后我们访问网关的地址 + 上方配置文件第33行的端口就可以访问到 RabbitMQ 了,而不需要访问具体某个节点的端口。

比如我们网关的地址是 192.168.20.10 ,配置的端口是 5555 ,我们就可以通过这个地址来访问。

Federation Exchange

介绍

(broker 北京),(broker 深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京 的业务(Client 北京) 需要连接(broker 北京),向其中的交换器 exchangeA 发送消息,此时的网络延迟很小, (Client 北京)可以迅速将消息发送至 exchangeA 中,就算在开启了发布确认机制或者事务机制的情况下,也可以迅速收到确认信息。

此时又有个在深圳的业务(Client 深圳)需要向 exchangeA 发送消息, 那么(Client 深圳) (broker 北京)之间有很大的网络延迟,(Client 深圳) 将发送消息至 exchangeA 会经历一定的延迟,尤其是在开启了发布确认机制或者事务机制的情况下,(Client 深圳) 会等待很长的延迟时间来接收(broker 北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的阻塞。

将业务(Client 深圳)部署到北京的机房可以解决这个问题,但是如果(Client 深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现? 这里使用 Federation 插件就可以很好地解决这个问题。

搭建步骤

搭建前提:保证每台节点单独运行

我们以前面集群中的 node1 和 node2 节点为例。

原理图:

进阶功能 - 图34

第一步:在两台机器上开启 federation 相关插件

  1. $ rabbitmq-plugins enable rabbitmq_federation
  2. $ rabbitmq-plugins enable rabbitmq_federation_management

第二步:在 node2 上创建交换机 fed_exchange 和 队列 node2_queue,并绑定交换机和队列。

第三步:在 downstream(node2) 上配置 upstream(node1)

进阶功能 - 图35

第四步:添加策略

进阶功能 - 图36

成功的结果

进阶功能 - 图37

Federation Queue

介绍

联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以 连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。

搭建步骤

原理图

进阶功能 - 图38

第一步:添加 upstream(见上一小节)

第二步:添加策略

进阶功能 - 图39

Shovel

介绍

Federation 具备的数据转发功能类似,Shovel 够可靠、持续地从一个 Broker 中的队列(作为源端,即 source)拉取数据并转发至另一个 Broker 中的交换器(作为目的端,即 destination)。

作为源端的队列和作 为目的端的交换器可以同时位于同一个 Broker,也可以位于不同的 Broker 上。Shovel 可以翻译为”铲子”, 是一种比较形象的比喻,这个”铲子”可以将消息从一方”铲子”另一方。Shovel 行为就像优秀的客户端应用 程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。

搭建步骤

原理图

进阶功能 - 图40

第一步:开启相关插件

  1. $ rabbitmq-plugins enable rabbitmq_shovel
  2. $ rabbitmq-plugins enable rabbitmq_shovel_management

第二步:添加 Shovel 源和目的地

进阶功能 - 图41


本章完。