1.普通消息的处理流程

image.png

  • 消息生成者发送消息
  • MQ 收到消息,将消息进行持久化,在存储中新增一条记录
  • 返回ACK给生产者
  • MQ push 消息给对应的消费者,然后等待消费者返回ACK
  • 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
  • MQ删除消息

    1.1 普通消息处理存在的一致性问题

    此时可能有同学会想,我们可否将消息发送和业务处理放在同一个本地事务中来进行处理,如果业务消息发送失败,那么本地事务就回滚,这样是不是就能解决消息发送的一致性问题呢?
    1. @Transactionnal
    2. public void processOrder() {
    3. try{
    4. // 订单处理(业务操作)
    5. orderService.process();
    6. // 发送订单处理成功消息(发送消息)
    7. sendBizMsg ();
    8. }catch(Exception e){
    9. 事务回滚;
    10. }
    11. }

    消息发送的异常情况分析

    可能的情况
可能的情况 是否一致
订单处理成功,然后突然宕机,事务未提交,消息没有发送出去。本地事务回滚 一致
订单处理成功,消息发送成功,但是MQ由于其他原因,导致消息存储失败,ACK明确回复失败,本地事务回滚 一致
订单处理成功,由于网络原因或者MQ宕机,消息没有发送出去,事务回滚 一致
订单处理成功,消息存储成功,但是MQ处理超时,从而ACK没有确认,超时后 生产者认为MQ处理失败,但实际MQ可能处理成功了,并且给消费者发了消息。导致发送方本地事务回滚(生产者事务回滚),从而导致和消费者数据不一致。 不一致

2.事务消息处理的流程

由于传统的处理方式无法解决消息生成者本地事务处理成功与消息发送成功两者的一致性问题,因此事务消息就诞生了,它实现了消息生成者本地事务与消息发送的原子性,保证了消息生成者本地事务处理成功与消息发送成功的最终一致性问题。
image.png

  • 事务消息与普通消息的区别就在于消息生产环节,生产者首先预发送一条消息到MQ(这也被称为发送 half 消息)
  • MQ接受到消息后,先进行持久化,则存储中会新增一条状态为待发送的消息
  • 然后返回ACK给消息生产者,此时MQ不会触发消息推送事件;
  • 生产者预发送消息成功后,执行本地事务;
  • 执行本地事务,执行完成后,发送执行结果给MQ;
  • MQ会根据结果删除或者更新消息状态为可发送;
  • 如果消息状态更新为可发送,则MQ会push消息给消费者,后面消息的消费和普通消息是一样的;

    事务消息异常情况分析:

    | 异常情况 | 一致性 | 处理异常方法 | | —- | —- | —- | | 消息未存储,业务操作未执行 | 一致 | 无 | | 存储待发送消息成功,但是ACK失败,导致业务未执行(可能是MQ处理超时、网络抖动等原因) | 不一致 | MQ确认业务操作结果,处理消息(删除消息) | | 存储待发送消息成功,ACK成功,业务执行(可能成功也可能失败),但是MQ没有收到生产者业务处理的最终结果 | 不一致 | MQ确认业务操作结果,处理消息(根据就业务处理结果,更新消息状态,如果业务执行成功,则投递消息,失败则删除消息) | | 业务处理成功,并且发送结果给MQ,但是MQ更新消息失败,导致消息状态依旧为待发送 | 不一致 | 同上 |

常见的问题:

  1. 问:如果预发送消息失败,是不是业务就不执行了?

    1. 答:是的,对于基于消息最终一致性的方案,一般都会强依赖这步,如果这个步骤无法得到保证,那么最终也 就不可能做到最终一致性了。
  2. 问:为什么要增加一个消息预发送机制,增加两次发布出去消息的重试机制,为什么不在业务成功之后,发送失败的话使用一次重试机制?

    1. 答:如果业务执行成功,再去发消息,此时如果还没来得及发消息,业务系统就已经宕机了,系统重启后,根本没有记录之前是否发送过消息,这样就会导致业务执行成功,消息最终没发出去的情况。
  3. 问:如果consumer消费失败,是否需要producer做回滚呢?

    答:这里的事务消息,producer不会因为consumer消费失败而做回滚,采用事务消息的应用,其所追求的是高可用最终一致性,消息消费失败的话,MQ自己会负责重推消息,直到消费成功。因此,事务消息是针对生产端而言的,而消费端,消费端的一致性是通过MQ的重试机制来完成的。

  4. 如果consumer端因为业务异常而导致回滚,那么岂不是两边最终无法保证一致性?

    1. 答:基于消息的最终一致性方案必须保证消费端在**业务上的操作没障碍**,它只允许系统异常的失败,不允许业务上的失败,比如在你业务上抛出个NPE之类的问题,导致你消费端执行事务失败,那就很难做到一致了。

3.独立消息服务的最终一致性

image.png

独立消息服务最终一致性与本地消息服务最终一致性最大的差异就在于将消息的存储单独地做成了一个RPC的服务,这个过程其实就是模拟了事务消息的消息预发送过程,如果预发送消息失败,那么生产者业务就不会去执行,因此对于生产者的业务而言,它是强依赖于该消息服务的。不过好在独立消息服务支持水平扩容,因此只要部署多台,做成HA的集群模式,就能够保证其可靠性。在消息服务中,还有一个单独地定时任务,它会定期轮训长时间处于待发送状态的消息,通过一个check补偿机制来确认该消息对应的业务是否成功,如果对应的业务处理成功,则将消息修改为可发送,然后将其投递给MQ;如果业务处理失败,则将对应的消息更新或者删除即可。因此在使用该方案时,消息生产者必须同时实现一个check服务,来供消息服务做消息的确认。对于消息的消费,该方案与上面的处理是一样,都是通过MQ自身的重发机制来保证消息被消费。