微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件来做,从这个角度来说,在互联网应用开发中,消息中间件真的是太重要了。

以 RabbitMQ 为例,松哥来和大家聊一聊消息中间消息发送可靠性的问题。

注意,以下内容我主要和大家讨论如何确保消息生产者将消息发送成功,并不涉及消息消费的问题。

1. RabbitMQ消息发送机制

大家知道,RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者去消费。

08-RabbitMQ发送可靠性 - 图1

大致的流程就是这样,所以要确保消息发送的可靠性,主要从两方面去确认:

  1. 消息成功到达 Exchange
  2. 消息成功到达 Queue

如果能确认这两步,那么我们就可以认为消息发送成功了。

如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。

经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了:

  1. 确认消息到达 Exchange。
  2. 确认消息到达 Queue。
  3. 开启定时任务,定时投递那些发送失败的消息。

2. RabbitMQ的努力

上面提出的三个步骤,第三步需要我们自己实现,前两步 RabbitMQ 则有现成的解决方案。

如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案:

  1. 开启事务机制
  2. 发送方确认机制

这是两种不同的方案,千万不可以同时开启,只能选择其中之一,如果两者同时开启,则会报如下错误:

08-RabbitMQ发送可靠性 - 图2

我们分别来看。以下所有案例都在 Spring Boot 中展开,文末可以下载相关源码。

2.1. 开启事务机制

首先创建一个Empty Project,工程名叫做

08-RabbitMQ发送可靠性 - 图3

08-RabbitMQ发送可靠性 - 图4

在mq_send_message工程下使用Spring Initializr的方式创建一个SpringBoot的module,模块叫做tx

08-RabbitMQ发送可靠性 - 图5

选择依赖

08-RabbitMQ发送可靠性 - 图6

在tx模块的application.properties配置文件中做如下配置:

  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=guest
  4. spring.rabbitmq.password=guest
  5. spring.rabbitmq.virtual-host=/

在tx模块的org.javaboy.tx.config包下面创建RabbitMQ的配置类

  1. @Configuration
  2. public class RabbitConfig {
  3. public static final String JAVABOY_MSG_QUEUE_NAME = "javaboy_msg_queue_name";
  4. public static final String JAVABOY_MSG_EXCHANGE_NAME = "javaboy_msg_exchange_name";
  5. @Bean
  6. Queue msgQueue() {
  7. return new Queue(JAVABOY_MSG_QUEUE_NAME, true, false, false);
  8. }
  9. @Bean
  10. DirectExchange directExchange() {
  11. return new DirectExchange(JAVABOY_MSG_EXCHANGE_NAME, true, false);
  12. }
  13. @Bean
  14. Binding msgBinding() {
  15. return BindingBuilder.bind(msgQueue())
  16. .to(directExchange())
  17. .with(JAVABOY_MSG_QUEUE_NAME);
  18. }
  19. //Spring Boot 中开启 RabbitMQ 事务机制的方式如下:
  20. //首先需要先提供一个事务管理器,如下:
  21. @Bean
  22. RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
  23. return new RabbitTransactionManager(connectionFactory);
  24. }
  25. //自己创建一个RabbitTemplate注入到IOC容器中
  26. @Bean
  27. RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  28. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  29. //开启事务模式
  30. rabbitTemplate.setChannelTransacted(true);
  31. return rabbitTemplate;
  32. }
  33. }

在tx模块的org.javaboy.tx.service包下面创建业务层

  1. /**
  2. * 1. 客户端发请求,将通信信道设置为事务模式
  3. * 2。 服务端给出答复,同意将通信信道设置为事务模式
  4. * 3。 发送消息
  5. * 4。 提交事务
  6. * 5。 服务端给出答复,确认事务提交。
  7. */
  8. @Service
  9. public class MsgService {
  10. @Autowired
  11. RabbitTemplate rabbitTemplate;
  12. @Transactional
  13. public void send() {
  14. rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_MSG_EXCHANGE_NAME, RabbitConfig.JAVABOY_MSG_QUEUE_NAME, "hello javaboy!");
  15. int i = 1 / 0;
  16. }
  17. }

可能会有小伙伴有这样的疑问,如果MySQL也需要用事务呢?

答案:MySQL开启事务也是使用@Transactional注解,但是MySQL和RabbitMQ使用的事务管理器是不一样的,可以使用@Transactional注解中的transactionManager属性来指定使用哪个事务管理器。

在tx模块的org.javaboy.tx.controller包下面创建控制器

  1. @RestController
  2. public class HelloController {
  3. @Autowired
  4. MsgService msgService;
  5. @GetMapping("/send")
  6. public void hello() {
  7. msgService.send();
  8. }
  9. }

接下来启动 Spring Boot 项目,在浏览器访问http://localhost:8080/send,然后查看控制台打印的日志,日志报了异常,并且RabbitMQ的web管理页面的Queues选项卡中一直没有消息,说明消息并未发送成功,事务已经起作用了。

当我们开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:

  1. 客户端发出请求,将信道设置为事务模式。
  2. 服务端给出回复,同意将信道设置为事务模式。
  3. 客户端发送消息。
  4. 客户端提交事务。
  5. 服务端给出响应,确认事务提交。

上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。我们可以想想,什么项目会用到消息中间件?一般来说都是一些高并发的项目,这个时候并发性能尤为重要。

所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保消息发送成功,这种方式,性能要远远高于事务模式,一起来看下。

2.2. 发送方确认机制

在mq_send_message工程下使用Spring Initializr的方式创建一个SpringBoot的module,模块名叫做confirm

08-RabbitMQ发送可靠性 - 图7

选择依赖

08-RabbitMQ发送可靠性 - 图8

在confirm模块的application.properties配置文件中做如下配置:

  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=guest
  4. spring.rabbitmq.password=guest
  5. spring.rabbitmq.virtual-host=/
  6. # 开启发送确认机制,将来消息到达交换机之后会有一个回调
  7. spring.rabbitmq.publisher-confirm-type=correlated
  8. # 消息到达队列的回调(消息如果没有成功到达队列,会触发回调方法)
  9. spring.rabbitmq.publisher-returns=true

倒数第二行是配置消息到达交换器的确认回调,最后一行则是配置消息没有成功到达队列的回调。

倒数第二行属性的配置有三个取值:

  1. none:表示禁用发布确认模式,默认即此。
  2. correlated:表示成功发布消息到交换器后会触发的回调方法。
  3. simple:类似 correlated,并且支持 waitForConfirms()waitForConfirmsOrDie() 方法的调用。

在confirm模块的org.javaboy.confirm.config包下面创建RabbitMQ的配置类

  1. @Configuration
  2. public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
  3. public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
  4. public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
  5. private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
  6. @Autowired
  7. RabbitTemplate rabbitTemplate;
  8. //把两个回调方法设置到IOC容器中的RabbitTemplate对象中
  9. @PostConstruct
  10. public void initRabbitTemplate() {
  11. rabbitTemplate.setReturnsCallback(this);
  12. rabbitTemplate.setConfirmCallback(this);
  13. }
  14. @Bean
  15. Binding msgBinding() {
  16. return BindingBuilder.bind(msgQueue())
  17. .to(directExchange())
  18. .with(JAVABOY_QUEUE_NAME);
  19. }
  20. @Bean
  21. DirectExchange directExchange() {
  22. return new DirectExchange(JAVABOY_EXCHANGE_NAME, true, false);
  23. }
  24. @Bean
  25. Queue msgQueue() {
  26. return new Queue(JAVABOY_QUEUE_NAME, true, false, false);
  27. }
  28. /**
  29. * 消息成功到达交换机,会触发该回调方法
  30. * @param correlationData
  31. * @param ack:如果是true表示消息成功到达交换机,如果是false表示发送失败。
  32. * @param cause:原因
  33. */
  34. @Override
  35. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  36. if (ack) {
  37. //消息成功到达交换机
  38. logger.info("{} 消息成功到达交换机", correlationData.getId());
  39. }else{
  40. logger.error("{} 消息未到达交换机,{}", correlationData.getId(), cause);
  41. }
  42. }
  43. /**
  44. * 消息未成功到达队列,会触发该回调方法
  45. * @param returned
  46. */
  47. @Override
  48. public void returnedMessage(ReturnedMessage returned) {
  49. logger.error("{} 消息未成功到达队列", returned.getMessage().getMessageProperties().getMessageId());
  50. }
  51. }

关于这个配置类,我说如下几点:

  1. 定义配置类,实现 RabbitTemplate.ConfirmCallbackRabbitTemplate.ReturnsCallback 两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用。
  2. 定义 initRabbitTemplate 方法并添加 @PostConstruct 注解,在该方法中为 rabbitTemplate 分别配置这两个 Callback。

在confirm模块的org.javaboy.confirm.controller包下面创建控制器

  1. @RestController
  2. public class HelloController {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @GetMapping("/send")
  6. public void hello() {
  7. rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME, RabbitConfig.JAVABOY_QUEUE_NAME, "hello javaboy!",new CorrelationData(UUID.randomUUID().toString()));
  8. //rabbitTemplate.convertAndSend("aaa", RabbitConfig.JAVABOY_QUEUE_NAME, "hello javaboy!",new CorrelationData(UUID.randomUUID().toString()));
  9. //rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME, "bbb", "hello javaboy!",new CorrelationData(UUID.randomUUID().toString()));
  10. }
  11. }

启动应用,然后在浏览器访问http://localhost:8080/send,查看RabbitMQ的web管理页面的Queues选项卡,发现队列中有一条消息等待被消费。

08-RabbitMQ发送可靠性 - 图9

并且控制台打印了日志消息

08-RabbitMQ发送可靠性 - 图10

接下来尝试将消息发送到一个不存在的交换机中,像下面这样:

  1. rabbitTemplate.convertAndSend("aaa", RabbitConfig.JAVABOY_QUEUE_NAME, "hello javaboy!",new CorrelationData(UUID.randomUUID().toString()));

控制台打印的日志如下:

08-RabbitMQ发送可靠性 - 图11

接下来我们给定一个真实存在的交换器,但是给一个不存在的队列,像下面这样:

  1. rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME, "bbb", "hello javaboy!",new CorrelationData(UUID.randomUUID().toString()));

控制台打印的日志如下:

08-RabbitMQ发送可靠性 - 图12

注意:一般来说消息能成功到达交换机,大概率就能成功到达消息队列,如果消息成功到达交换机,却没有成功到达消息队列,肯定是你的代码写错了。

3. 失败重试

失败重试分两种情况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,但是消息发送失败了。

两种重试我们分别来看。

3.1. 自带重试机制

前面所说的事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这是利用 Spring 中的 retry 机制来完成的,具体配置如下:

  1. spring.rabbitmq.template.retry.enabled=true
  2. spring.rabbitmq.template.retry.initial-interval=1000ms
  3. spring.rabbitmq.template.retry.max-attempts=10
  4. spring.rabbitmq.template.retry.max-interval=10000ms
  5. spring.rabbitmq.template.retry.multiplier=2

从上往下配置含义依次是:

  • 开启重试机制。
  • 重试间隔时间。
  • 最大重试次数。
  • 最大重试间隔时间。
  • 间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)

在confirm模块配置完成后,再次启动 Spring Boot 项目,然后关掉 MQ,此时尝试发送消息,就会发送失败,进而导致自动重试。

控制台日志打印如下:

08-RabbitMQ发送可靠性 - 图13

3.2. 业务重试

业务重试主要是针对消息没有到达交换器的情况。

如果消息没有成功到达交换器,根据我们第二小节的讲解,此时就会触发消息发送失败回调,在这个回调中,我们就可以做文章了!

整体思路是这样:

  1. 首先创建一张表,用来记录发送到中间件上的消息,像下面这样:

08-RabbitMQ发送可靠性 - 图14

每次发送消息的时候,就往数据库中添加一条记录。这里的字段都很好理解,有三个我额外说下:

  • status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发送失败。
  • tryTime:表示消息的第一次重试时间(消息发出去之后,在 tryTime 这个时间点还未显示发送成功,此时就可以开始重试了)。
  • count:表示消息重试次数。

其他字段都很好理解,我就不一一啰嗦了。

  1. 在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。
  2. 在 confirm 回调方法中,如果收到消息发送成功的回调,就将该条消息的 status 设置为1(在消息发送时为消息设置 msgId,在消息发送成功回调时,通过 msgId 来唯一锁定该条消息)。
  3. 另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,把这些消息拎出来后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则修改该条消息的 status 为 2,表示这条消息发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1。

大致的思路就是上面这样,松哥这里就不给出代码了,松哥的 vhr 里边邮件发送就是这样的思路来处理的,完整代码大家可以参考 vhr 项目(https://github.com/lenve/vhr)。

当然这种思路有两个弊端:

  1. 去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候我们并不需要 MQ 有很高的 Qos,所以这个应用时要看具体情况。
  2. 按照上面的思路,可能会出现同一条消息重复发送的情况,不过这都不是事,我们在消息消费时,解决好幂等性问题就行了。

当然,大家也要注意,消息是否要确保 100% 发送成功,也要看具体情况。

好啦,这就是关于消息生产者的一些常见问题以及对应的解决方案,下一小节松哥和大家探讨如果保证消息消费成功并解决幂等性问题。