有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上的一些主流的MQ都是不支持事务消息的,比如RabbitMQ和Kafka都不支持。

我们知道事务消息。可以想象一下,如果没有事务消息的话,我们用普通消息如何保证最终一致性。

普通消息先发消息方式

MQ事务消息 - 图1
假如分为两步操作
1、发送消息,发送消息成功(发送消息通知B系统给B增加50元)
2、执行本地事务(扣除A账户50元)

异常情况 结果
发送消息失败 不进行2步骤执行
发送消息成功 进行本地事务执行,步骤2执行如果失败,需要通知B系统去回滚给B增加的50元

普通消息先进行本地事务执行

事务也有执行的先后顺序,A转账给B,至少要等A扣除钱了,才能通知B系统,这才是正常的流程。现在更改如下
MQ事务消息 - 图2
假如分为两步操作
1、执行本地事务(扣除A账户50元)
2、发送消息,发送消息成功(发送消息通知B系统给B增加50元)

异常情况 结果
本地事务执行失败 不进行2步骤执行
本地事务执行成功,消息发送失败 消息发送失败,要么补发消息,要么进行A事务回滚

两种情景,在设计的过程中,其实更偏向与第二种方式,因为它更符合正常的逻辑顺序 第一种方式需要跨B系统进行回滚事务,操作性更复杂 第二种方式,要么回滚,要么补发消息。操作的逻辑完全在A系统这边,操作起来更容易。但是面临回滚操作,当前只有B一个系统,如果此处操作涉及5个系统,其中一个环节出了问题,都要进行相应的回滚。系统越多,回滚逻辑也就越复杂。所以普通消息保证事务可以完成,但是需要做很多额外的操作保证回滚,或者补发,达到最终一致性的效果

事务消息

事务消息依赖于支持“事务消息”的消息队列,其基本思想是 利用消息中间间实施两阶段提交,将本地事务和发消息放在了一个分布式事务里,保证要么本地操作成功成功并且对外发消息成功,要么两者都失败。流程如下:    

  • 主事务向消息队列发送预备消息
  • 主事务收到ACK之后本地执行主事务
  • 根据执行的结果(成功或失败)向消息队列发送提交或者回滚消息
    详细的流程如下图所示:

MQ事务消息 - 图3

事务消息作为一种异步确保型事务, 将两个事务分支通过 MQ 进行异步解耦,RocketMQ 事务消息的设计流程同样借鉴了两阶段提交理论,整体交互流程如下图所示:
MQ事务消息 - 图4

  1. 事务发起方首先发送 prepare 消息到 MQ。
  2. 在发送 prepare 消息成功后执行本地事务。
  3. 根据本地事务执行结果返回 commit 或者是 rollback。
  4. 如果消息是 rollback,MQ 将删除该 prepare 消息不进行下发,如果是 commit 消息,MQ 将会把这个消息发送给 consumer 端。
  5. 如果执行本地事务过程中,执行端挂掉,或者超时,MQ 将会不停的询问其同组的其他 producer 来获取状态。
  6. Consumer 端的消费成功机制有 MQ 保证。

源码如下:

  1. public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
  2. throws MQClientException {
  3. SendResult sendResult = null;
  4. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
  5. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
  6. try {
  7. sendResult = this.send(msg);
  8. } catch (Exception e) {
  9. throw new MQClientException("send message Exception", e);
  10. }
  11. LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
  12. Throwable localException = null;
  13. switch (sendResult.getSendStatus()) {
  14. case SEND_OK: {
  15. try {
  16. if (sendResult.getTransactionId() != null) {
  17. msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
  18. }
  19. localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
  20. if (null == localTransactionState) {
  21. localTransactionState = LocalTransactionState.UNKNOW;
  22. }
  23. if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
  24. log.info("executeLocalTransactionBranch return {}", localTransactionState);
  25. log.info(msg.toString());
  26. }
  27. } catch (Throwable e) {
  28. log.info("executeLocalTransactionBranch exception", e);
  29. log.info(msg.toString());
  30. localException = e;
  31. }
  32. }
  33. break;
  34. case FLUSH_DISK_TIMEOUT:
  35. case FLUSH_SLAVE_TIMEOUT:
  36. case SLAVE_NOT_AVAILABLE:
  37. localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
  38. break;
  39. default:
  40. break;
  41. }
  42. try {
  43. this.endTransaction(sendResult, localTransactionState, localException);
  44. } catch (Exception e) {
  45. log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
  46. }
  47. TransactionSendResult transactionSendResult = new TransactionSendResult();
  48. transactionSendResult.setSendStatus(sendResult.getSendStatus());
  49. transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
  50. transactionSendResult.setMsgId(sendResult.getMsgId());
  51. transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
  52. transactionSendResult.setTransactionId(sendResult.getTransactionId());
  53. transactionSendResult.setLocalTransactionState(localTransactionState);
  54. return transactionSendResult;
  55. }

源码解析:
1、事务发起方首先发送 prepare 消息到 MQ。
2、如果prepare消息发送成功,则进行本地事务的实现LocalTransactionExecuter的执行
3、如果prepare消息发送失败,则设置LocalTransactionState.ROLLBACK_MESSAGE状态,进行回滚操作
4、MQ server会根据LocalTransactionState状态去commit/rollback这条prepare消息
5、如果commit,则发送到消费方去消费这条消息
6、如果MQ server没有收到LocalTransactionState,broker定时回查本地事务的执行结果。回查频率默认1分钟,如果超过检测次数(默认15次),消息会默认为丢弃,即回滚消息。回查需要实现TransactioCheckListener类。