一、事务消息的由来

1、案例

引用官方的购物案例:
小明购买一个100元的东西,账户扣款100元的同时需要保证在下游的积分系统给小明这个账号增加100积分。账号系统和积分系统是两个独立是系统,一个要减少100元,一个要增加100积分。如下图:
RocketMQ事务消息 - 图1

2、问题

  • 账号服务扣款成功了,通知积分系统也成功了,但是积分增加的时候失败了,数据不一致了。
  • 账号服务扣款成功了,但是通知积分系统失败了,所以积分不会增加,数据不一致了。

    3、方案

  • RocketMQ针对第一个问题解决方案是:如果消费失败了,是会自动重试的,如果重试几次后还是消费失败,那么这种情况就需要人工解决了,比如放到死信队列里然后手动查原因进行处理等。

  • RocketMQ针对第二个问题解决方案是:如果你扣款成功了,但是往mq写消息的时候失败了,那么RocketMQ会进行回滚消息的操作,这时候我们也能回滚我们扣款的操作。

    二、事务消息的原理

    1、原理图解

    RocketMQ事务消息 - 图2

    2、详细过程

    1.Producer发送半消息(Half Message)到broker。

    我真想吐槽一句为啥叫半消息,难以理解,其实这就是prepare message,预发送消息。

  • Half Message发送成功后开始执行本地事务。

  • 如果本地事务执行成功的话则返回commit,如果执行失败则返回rollback。(这个是在事务消息的回调方法里由开发者自己决定commit or rollback)

Producer发送上一步的commit还是rollback到broker,这里有两种情况:
1.如果broker收到了commit/rollback消息 :

  • 如果收到了commit,则broker认为整个事务是没问题的,执行成功的。那么会下发消息给Consumer端消费。
  • 如果收到了rollback,则broker认为本地事务执行失败了,broker将会删除Half Message,不下发给Consumer端。

2.如果broker未收到消息(如果执行本地事务突然宕机了,相当本地事务执行结果返回unknow,则和broker未收到确认消息的情况一样处理。):

  • broker会定时回查本地事务的执行结果:如果回查结果是本地事务已经执行则返回commit,若未执行,则返回rollback。
  • Producer端回查的结果发送给Broker。Broker接收到的如果是commit,则broker视为整个事务执行成功,如果是rollback,则broker视为本地事务执行失败,broker删除Half Message,不下发给consumer。如果broker未接收到回查的结果(或者查到的是unknow),则broker会定时进行重复回查,以确保查到最终的事务结果。重复回查的时间间隔和次数都可配。

    三、事务消息实现流程

    1、实现流程

    RocketMQ事务消息 - 图3
    简单来看就是:事务消息是个监听器,有回调函数,回调函数里我们进行业务逻辑的操作,比如给账户-100元,然后发消息到积分的mq里,这时候如果账户-100成功了,且发送到mq成功了,则设置消息状态为commit,这时候broker会将这个半消息发送到真正的topic中。一开始发送他是存到半消息队列里的,并没存在真实topic的队列里。只有确认commit后才会转移。

    2、补救方案

    如果事务因为中断,或是其他的网络原因,导致无法立即响应的,RocketMQ当做UNKNOW处理,RocketMQ事务消息还提供了一个补救方案:定时查询事务消息的事务状态。这也是一个回调函数,这里面可以做补偿,补偿逻辑开发者自己写,成功的话自己返回commit就完事了。

    四、事务消息代码实例

    1、代码

    ```java package com.chentongwei.mq.rocketmq;

import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt;

import java.util.Date;

/**

  • Description: *
  • @author TongWei.Chen 2020-06-21 11:32:58 */ public class ProducerTransaction2 { public static void main(String[] args) throws Exception {

    1. TransactionMQProducer producer = new TransactionMQProducer("my-transaction-producer");
    2. producer.setNamesrvAddr("124.57.180.156:9876");
    3. // 回调
    4. producer.setTransactionListener(new TransactionListener() {
    5. @Override
    6. public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
    7. LocalTransactionState state = null;
    8. //msg-4返回COMMIT_MESSAGE
    9. if(message.getKeys().equals("msg-1")){
    10. state = LocalTransactionState.COMMIT_MESSAGE;
    11. }
    12. //msg-5返回ROLLBACK_MESSAGE
    13. else if(message.getKeys().equals("msg-2")){
    14. state = LocalTransactionState.ROLLBACK_MESSAGE;
    15. }else{
    16. //这里返回unknown的目的是模拟执行本地事务突然宕机的情况(或者本地执行成功发送确认消息失败的场景)
    17. state = LocalTransactionState.UNKNOW;
    18. }
    19. System.out.println(message.getKeys() + ",state:" + state);
    20. return state;
    21. }
    22. /**
    23. * 事务消息的回查方法
    24. */
    25. @Override
    26. public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    27. if (null != messageExt.getKeys()) {
    28. switch (messageExt.getKeys()) {
    29. case "msg-3":
    30. System.out.println("msg-3 unknow");
    31. return LocalTransactionState.UNKNOW;
    32. case "msg-4":
    33. System.out.println("msg-4 COMMIT_MESSAGE");
    34. return LocalTransactionState.COMMIT_MESSAGE;
    35. case "msg-5":
    36. //查询到本地事务执行失败,需要回滚消息。
    37. System.out.println("msg-5 ROLLBACK_MESSAGE");
    38. return LocalTransactionState.ROLLBACK_MESSAGE;
    39. }
    40. }
    41. return LocalTransactionState.COMMIT_MESSAGE;
    42. }
    43. });
    44. producer.start();
    45. //模拟发送5条消息
    46. for (int i = 1; i < 6; i++) {
    47. try {
    48. Message msg = new Message("transactionTopic", null, "msg-" + i, ("测试,这是事务消息! " + i).getBytes());
    49. producer.sendMessageInTransaction(msg, null);
    50. } catch (Exception e) {
    51. e.printStackTrace();
    52. }
    53. }

    } } ```

    2、结果

    ```java msg-1,state:COMMIT_MESSAGE msg-2,state:ROLLBACK_MESSAGE msg-3,state:UNKNOW msg-4,state:UNKNOW msg-5,state:UNKNOW

msg-3 unknow msg-3 unknow msg-5 ROLLBACK_MESSAGE msg-4 COMMIT_MESSAGE

msg-3 unknow msg-3 unknow msg-3 unknow msg-3 unknow

  1. <a name="t5igI"></a>
  2. ## 3、管控台
  3. ![](https://cdn.nlark.com/yuque/0/2022/webp/2436487/1643805987098-3b2e3b39-6c0c-44a2-9903-507e03dfa417.webp#clientId=ua6c25e79-d99d-4&crop=0&crop=0&crop=1&crop=1&from=paste&id=u0733ee65&margin=%5Bobject%20Object%5D&originHeight=136&originWidth=1080&originalType=url&ratio=1&rotation=0&showTitle=false&status=done&style=none&taskId=u602e0523-f86a-4ea7-b9b8-523efe17019&title=)
  4. <a name="Qw83Q"></a>
  5. ## 4、结果分析
  6. - 只有msg-1和msg-4发送成功了。msg-4在msg-1前面是因为msg-1先成功的,msg-4是回查才成功的。按时间倒序来的。
  7. - 先来输出五个结果,对应五条消息
  8. ```java
  9. msg-1,state:COMMIT_MESSAGE
  10. msg-2,state:ROLLBACK_MESSAGE
  11. msg-3,state:UNKNOW
  12. msg-4,state:UNKNOW
  13. msg-5,state:UNKNOW
  • 然后进入了回查,msg-3还是unknow,msg-5回滚了,msg-4提交了事务。所以这时候msg-4在管控台里能看到了。
  • 过了一段时间再次回查msg-3,发现还是unknow,所以一直回查。

回查的时间间隔和次数都是可配的,默认是回查15次还失败的话就会把这个消息丢掉了。

五、疑问

疑问:Spring事务、常规的分布式事务不行吗?Rocketmq的事务是否多此一举了呢?
MQ用于解耦,之前是分布式事务直接操作了账号系统和积分系统。但是他两就是强耦合的存在,如果中间插了个mq,账号系统操作完发消息到mq,这时候只要保证发送成功就提交,发送失败则回滚,这步怎么保证,就是靠事务了。而且用RocketMQ做分布式事务的也蛮多的。