RocketMQ - 可靠消息最终一致性事务解决方案

1. RocketMQ 事务消息方案概述

RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在 RocketMQ 之上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便利性支持。
RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ 本身提供的存储机制为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。
在 RocketMQ 4.3 后实现了完整的事务消息,实际上其实是对本地消息表的一个封装,将本地消息表移动到了 MQ 内部,解决 Producer 端的消息发送与本地事务执行的原子性问题。

1.1. 事务消息执行流程图

406519583666
事务消息执行流程如下:

  1. 发送方发送一个事务消息给 Broker,RocketMQ 会将消息状态标记为“Prepared”(预备状态),此时这条消息暂时不能被接收方消费。这样的消息称之为 Half Message,即半消息。
  2. Broker 返回发送成功给发送方
  3. 发送方执行本地事务(例如,操作数据库),Producer 端执行业务代码逻辑,通过本地数据库事务控制。
  4. 若本地事务执行成功,发送 commit 消息给 Broker,RocketMQ 会将消息状态标记为“可消费”,此时这条消息就可以被接收方消费;若本地事务执行失败,发送 rollback 消息给 Broker,RocketMQ 将删除该消息。
  5. 如果发送方在本地事务过程中,出现服务挂掉,网络闪断或者超时,那 Broker 将无法收到确认结果。此时 RocketMQ 将会不停的询问发送方来获取本地事务的执行状态,这个过程叫事务回查
  6. 发送方会检查本地的事务状态
  7. Broker 会根据事务回查的结果来决定 Commit 或 Rollback,这样就保证了消息发送与本地事务同时成功或同时失败。

    1.2. RocketMQ 本地事务执行与回查实现

    以上主干流程已由 RocketMQ 实现,对用户侧来说,用户只需分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可。
    RoacketMQ 提供 RocketMQLocalTransactionListener 接口: ``` public interface RocketMQLocalTransactionListener {

    /**

    • 发送prepare消息成功此方法被回调,该方法用于执行本地事务 *
    • @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
    • @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
    • @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调 */ RocketMQLocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

      /**

    • 用于本地事务状态回查 *
    • @param msg 通过获取 transactionId 来判断这条消息的本地事务执行状态
    • @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调 */ RocketMQLocalTransactionState checkLocalTransaction(final Message msg); }
  1. ### 1.3. RocketMQ 用于发送事务消息的 API

TransactionMQProducer producer = new TransactionMQProducer(“ProducerGroup”); producer.setNamesrvAddr(“127.0.0.1:9876”); producer.start(); // 设置TransactionListener实现 producer.setTransactionListener(transactionListener); // 发送事务消息 SendResult sendResult = producer.sendMessageInTransaction(msg, null);

  1. ## 2. RocketMQ 事务消息快速入门
  2. ### 2.1. 案例业务说明
  3. 本实例通过 RocketMQ 可靠消息实现最终一致性,模拟两个账户的转账交易过程。两个账户分别在不同的银行(张三在bank1、李四在bank2),bank1bank2是两个相互独立的微服务。交易过程是,张三给李四转账指定金额。<br />上述交易步骤,张三扣减金额与给bank2发转账消息,两个操作必须是一个整体性的事务。<br />![382484562888](https://gitee.com/moonzero/images/raw/master/code-note/80391022220361.jpg)<br />本示例程序技术架构如下:<br />![](https://gitee.com/moonzero/images/raw/master/code-note/250295022238787.png)<br />交互流程如下:
  4. 1. Bank1 MQ Server 发送转账消息
  5. 1. Bank1 执行本地事务,扣减金额
  6. 1. Bank2 接收消息,执行本地事务,添加金额
  7. ### 2.2. 环境搭建
  8. #### 2.2.1. 环境要求
  9. - 数据库:MySQL 5.7.25
  10. - 两个数据库:bank1bank2
  11. - JDK64 JDK 1.8.0_311
  12. - rocketmq 服务端:RocketMQ-4.9.2
  13. - rocketmq 客户端:RocketMQ-Spring-Boot-starter 2.0.2
  14. - 微服务框架:spring-boot-2.1.3.RELEASEspring-cloud-Greenwich.RELEASE
  15. - 微服务及数据库的关系
  16. - ensure-message-demo-bank1 服务,操作张三账户,连接数据库bank1
  17. - ensure-message-demo-bank2 服务,操作李四账户,连接数据库bank2
  18. #### 2.2.2. 数据库
  19. 执行以下脚本,创建测试数据库、表与测试数据

— 创建 bank1 库,并导入以下表结构和数据: DROP DATABASE IF EXISTS bank1; CREATE DATABASE bank1 CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;

USE bank1; DROP TABLE IF EXISTS account_info; CREATE TABLE account_info ( id bigint(20) NOT NULL AUTO_INCREMENT, account_name varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘户主姓名’, account_no varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘银行卡号’, account_password varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘帐户密码’, account_balance double NULL DEFAULT NULL COMMENT ‘帐户余额’, PRIMARY KEY (id) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

INSERT INTO account_info VALUES (1, ‘张三’, ‘1’, ‘’, 10000);

DROP TABLE IF EXISTS de_duplication; CREATE TABLE de_duplication ( tx_no bigint(20) NOT NULL, create_time datetime(0) NULL DEFAULT NULL, PRIMARY KEY (tx_no) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

— 创建bank2库,并导入以下表结构和数据: DROP DATABASE IF EXISTS bank2; CREATE DATABASE bank2 CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;

USE bank2; DROP TABLE IF EXISTS account_info; CREATE TABLE account_info ( id bigint(20) NOT NULL AUTO_INCREMENT, account_name varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘户主姓名’, account_no varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘银行卡号’, account_password varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘帐户密码’, account_balance double NULL DEFAULT NULL COMMENT ‘帐户余额’, PRIMARY KEY (id) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; INSERT INTO account_info VALUES (2, ‘李四’, ‘2’, NULL, 0);

DROP TABLE IF EXISTS de_duplication; CREATE TABLE de_duplication ( tx_no bigint(20) NOT NULL, create_time datetime(0) NULL DEFAULT NULL, PRIMARY KEY (tx_no) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

  1. > 注:其中 de_duplication 交易记录表(去重表),即是用于实现幂等性
  2. #### 2.2.3. 启动 RocketMQ
  3. > RocketMQ 详细教程详见[《分布式消息中件间 RocketMQ》笔记](/07-%E5%88%86%E5%B8%83%E5%BC%8F%E6%9E%B6%E6%9E%84&%E5%BE%AE%E6%9C%8D%E5%8A%A1%E6%9E%B6%E6%9E%84/07-%E5%88%86%E5%B8%83%E5%BC%8F%E6%B6%88%E6%81%AF%E4%B8%AD%E4%BB%B6%E9%97%B4/04-RocketMQ)
  4. 到官网下载 RocketMQ 到本地并配置,到安装目录的 bin 文件中执行下面命令启动 RocketMQ

start mqnamesrv.cmd start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

  1. 启动 RocketMQ 图形化管理后台

java -jar rocketmq-console-ng-1.0.0.jar —server.port=7777 —rocketmq.config.namesrvAddr=127.0.0.1:9876

  1. ### 2.3. 创建 Maven 示例工程
  2. #### 2.3.1. 聚合工程
  3. - 创建 pom 聚合工程 ensure-message-demo,进行依赖管理

<?xml version=”1.0” encoding=”UTF-8”?>

4.0.0 com.moon ensure-message-demo 1.0-SNAPSHOT pom ${project.artifactId} RocketMQ 实现可靠消息最终一致性的分布式事务解决方案基础示例 ensure-message-demo-bank1 ensure-message-demo-bank2 UTF-8 UTF-8 1.8 org.projectlombok lombok 1.18.0 javax.servlet javax.servlet-api 3.1.0 provided javax.interceptor javax.interceptor-api 1.2 mysql mysql-connector-java 5.1.47 org.mybatis.spring.boot mybatis-spring-boot-starter 2.0.0 com.alibaba druid-spring-boot-starter 1.1.16 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.2 commons-lang commons-lang 2.6 org.springframework.cloud spring-cloud-dependencies Greenwich.RELEASE pom import org.springframework.boot spring-boot-dependencies 2.1.3.RELEASE pom import ${project.name} src/main/resources true /* src/main/java /*.xml org.springframework.boot spring-boot-maven-plugin org.apache.maven.plugins maven-compiler-plugin 1.8 1.8 maven-resources-plugin utf-8 true
  1. #### 2.3.2. 创建微服务
  2. - 创建 ensure-message-demo-bank1 工程,负责张三账户操作;创建 ensure-message-demo-bank2 工程,负责李四账户操作。同样引入以下依赖:
org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-configuration-processor true org.mybatis.spring.boot mybatis-spring-boot-starter com.alibaba druid-spring-boot-starter mysql mysql-connector-java javax.interceptor javax.interceptor-api org.projectlombok lombok org.apache.rocketmq rocketmq-spring-boot-starter
  1. ### 2.4. 案例功能实现
  2. 此部分两个微服务工程的具体实现
  3. #### 2.4.1. ensure-message-demo-bank1 消息发送方工程
  4. ##### 2.4.1.1. 项目配置文件
  5. - 项目配置 application.properties

spring.application.name=ensure-message-demo-bank1

server.port=56081 server.servlet.context-path=/bank1 spring.http.encoding.enabled=true spring.http.encoding.charset=UTF-8 spring.mvc.throw-exception-if-no-handler-found=true

spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/bank1?useUnicode=true&useSSL=false spring.datasource.username=root spring.datasource.password=123456

rocketmq.producer.group=producer_ensure_bank1 rocketmq.name-server=127.0.0.1:9876

  1. ##### 2.4.1.2. 实体类
  2. - 定义封装转账消息的实体类

@Data @AllArgsConstructor @NoArgsConstructor public class AccountChangeEvent implements Serializable { private static final long serialVersionUID = 7726052118200407735L; /**

  1. * 账号
  2. */
  3. private String accountNo;
  4. /**
  5. * 变动金额
  6. */
  7. private double amount;
  8. /**
  9. * 事务号,时间戳
  10. */
  11. private long txNo;

}

  1. ##### 2.4.1.3. 持久层相关接口
  2. 创建持久层接口,定义扣减账户余额、查询账户信息、查询事务记录、保存事务记录等4个方法

@Mapper @Component public interface AccountInfoDao { /**

  1. * 扣减某账号的余额
  2. *
  3. * @param accountNo 账号
  4. * @param amount 变动金额
  5. * @return
  6. */
  7. @Update("update account_info set account_balance=account_balance-#{amount} where account_no=#{accountNo}")
  8. int subtractAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
  9. /**
  10. * 查询某账号信息
  11. *
  12. * @param accountNo 账号
  13. * @return
  14. */
  15. @Select("select * from account_info where where account_no=#{accountNo}")
  16. AccountInfo findByIdAccountNo(@Param("accountNo") String accountNo);
  17. /**
  18. * 查询某事务记录是否已执行
  19. *
  20. * @param txNo 事务编号
  21. * @return
  22. */
  23. @Select("select count(1) from de_duplication where tx_no = #{txNo}")
  24. int isExistTx(long txNo);
  25. /**
  26. * 保存某事务执行记录
  27. *
  28. * @param txNo 事务编号
  29. * @return
  30. */
  31. @Insert("insert into de_duplication values(#{txNo},now());")
  32. int addTx(long txNo);

}

  1. ##### 2.4.1.4. 实现发送转账消息
  2. 封装 RocketMQ 发送消息处理类,定义通过 `RocketMQTemplate` 发送转账消息的方法

@Component public class BankMessageProducer { @Resource private RocketMQTemplate rocketMQTemplate;

  1. public void sendAccountChangeEvent(AccountChangeEvent accountChangeEvent) {
  2. // 构造消息
  3. JSONObject jsonObject = new JSONObject();
  4. jsonObject.put("accountChange", accountChangeEvent);
  5. // 转成 json 字符串
  6. Message<String> msg = MessageBuilder.withPayload(jsonObject.toJSONString()).build();
  7. // 发送消息
  8. rocketMQTemplate.sendMessageInTransaction("producer_ensure_transfer", "topic_ensure_transfer", msg, null);
  9. }

}

  1. ##### 2.4.1.5. 业务层接口与实现
  2. 业务层接口,分别定义发送事务消息(`sendUpdateAccountBalanceMsg`)与本地事务扣减金额(`doUpdateAccountBalance`)方法

public interface AccountInfoService {

  1. /**
  2. * 更新帐号余额-发送消息
  3. *
  4. * @param accountChange
  5. */
  6. void sendUpdateAccountBalanceMsg(AccountChangeEvent accountChange);
  7. /**
  8. * 更新帐号余额-本地事务
  9. *
  10. * @param accountChange
  11. */
  12. void doUpdateAccountBalance(AccountChangeEvent accountChange);

}

  1. 业务实现类。注意:`doUpdateAccountBalance` 方法中的本地事务若执行成功,就会在交易记录去重表(de_duplication)保存一条数据。

@Service public class AccountInfoServiceImpl implements AccountInfoService {

  1. @Autowired
  2. private BankMessageProducer bankMessageProducer;
  3. @Autowired
  4. private AccountInfoDao accountInfoDao;
  5. /**
  6. * 更新帐号余额-发送消息
  7. *
  8. * @param accountChange
  9. */
  10. @Override
  11. public void sendUpdateAccountBalanceMsg(AccountChangeEvent accountChange) {
  12. bankMessageProducer.sendAccountChangeEvent(accountChange);
  13. }
  14. /**
  15. * 更新帐号余额-本地事务
  16. *
  17. * @param accountChange
  18. */
  19. @Override
  20. @Transactional(isolation = Isolation.SERIALIZABLE)
  21. public void doUpdateAccountBalance(AccountChangeEvent accountChange) {
  22. // 扣减余额
  23. accountInfoDao.subtractAccountBalance(accountChange.getAccountNo(), accountChange.getAmount());
  24. // 新增交易记录(与扣减余额操作在同一个事务中)
  25. accountInfoDao.addTx(accountChange.getTxNo());
  26. }

}

  1. ##### 2.4.1.6. RocketMQ 事务消息监听器
  2. 创建 RocketMQ 事务消息监听器,需要实现 `org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener` 接口,并在类上标识 `@RocketMQTransactionListener` 注解,其中 `txProducerGroup` 属性是用于指定监听的消息分组名称<br />实现接口中的方法,功能分别是:
  3. - `executeLocalTransaction`:该方法执行本地事务,会在发送半消息后,被 RocketMQ 自动调用
  4. - `checkLocalTransaction`:该方法实现事务回查,利用了交易记录去重表(de_duplication),会在无法收到确认消息时,被 RocketMQ 自动调用

@Component @Slf4j @RocketMQTransactionListener(txProducerGroup = “producer_ensure_transfer”) public class TransferTransactionListenerImpl implements RocketMQLocalTransactionListener {

  1. @Autowired
  2. private AccountInfoService accountInfoService;
  3. @Autowired
  4. private AccountInfoDao accountInfoDao;
  5. /**
  6. * 执行本地事务
  7. *
  8. * @param msg
  9. * @param arg
  10. * @return
  11. */
  12. @Override
  13. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  14. // 1.接收并解析消息
  15. final JSONObject jsonObject = JSON.parseObject(new String((byte[]) msg.getPayload()));
  16. AccountChangeEvent accountChangeEvent = JSONObject
  17. .parseObject(jsonObject.getString("accountChange"), AccountChangeEvent.class);
  18. try {
  19. // 2.执行本地事务
  20. accountInfoService.doUpdateAccountBalance(accountChangeEvent);
  21. // 3.返回执行结果
  22. return RocketMQLocalTransactionState.COMMIT;
  23. } catch (Exception e) {
  24. return RocketMQLocalTransactionState.ROLLBACK;
  25. }
  26. }
  27. /**
  28. * 执行事务回查
  29. *
  30. * @param msg
  31. * @return
  32. */
  33. @Override
  34. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  35. log.info("TransferTransactionListenerImpl 执行事务回查");
  36. // 1.接收并解析消息
  37. final JSONObject jsonObject = JSON.parseObject(new String((byte[]) msg.getPayload()));
  38. AccountChangeEvent accountChangeEvent = JSONObject
  39. .parseObject(jsonObject.getString("accountChange"), AccountChangeEvent.class);
  40. // 2.查询de_duplication表
  41. int isExistTx = accountInfoDao.isExistTx(accountChangeEvent.getTxNo());
  42. // 3.根据查询结果返回值。(交易记录表存在记录,则说明本地事务成功)
  43. return isExistTx > 0 ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
  44. }

}

  1. ##### 2.4.1.7. 控制层
  2. 定义请求控制层方法,发送事务消息

@RestController public class AccountInfoController {

  1. @Autowired
  2. private AccountInfoService accountInfoService;
  3. @GetMapping("/transfer")
  4. public String transfer() {
  5. accountInfoService.sendUpdateAccountBalanceMsg(new AccountChangeEvent("1", 100, System.currentTimeMillis()));
  6. return "转账成功";
  7. }

}

  1. #### 2.4.2. ensure-message-demo-bank2 消息接收方工程
  2. ##### 2.4.2.1. 项目配置文件
  3. - 项目配置 application.properties

spring.application.name=ensure-message-demo-bank2

server.port=56082 server.servlet.context-path=/bank2 spring.http.encoding.enabled=true spring.http.encoding.charset=UTF-8 spring.mvc.throw-exception-if-no-handler-found=true

spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/bank2?useUnicode=true&useSSL=false spring.datasource.username=root spring.datasource.password=123456

rocketmq.producer.group=producer_ensure_bank2 rocketmq.name-server=127.0.0.1:9876

  1. ##### 2.4.2.2. 持久层与实体类
  2. 持久层接口、数据库表实体、消息实体均与 ensure-message-demo-bank1 工程几乎一样,只修改了增加账户余额方法名称与sql

@Update(“update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}”) int addAccountBalance(@Param(“accountNo”) String accountNo, @Param(“amount”) Double amount);

  1. ##### 2.4.2.3. 业务层接口与实现
  2. 业务层接口定义增加账户余额方法

public interface AccountInfoService { /**

  1. * 更新帐号余额
  2. *
  3. * @param accountChange
  4. */
  5. void updateAccountBalance(AccountChangeEvent accountChange);

}

  1. 值得注意的是:在实现类方法中使用交易记录去重表(de_duplication)来实现幂等性控制

@Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService {

  1. @Autowired
  2. private AccountInfoDao accountInfoDao;
  3. @Override
  4. @Transactional(isolation = Isolation.SERIALIZABLE)
  5. public void updateAccountBalance(AccountChangeEvent accountChange) {
  6. log.info("bank2 工程 AccountInfoServiceImpl 执行本地事务");
  7. int isExistsTx = accountInfoDao.isExistTx(accountChange.getTxNo());
  8. if (isExistsTx == 0) {
  9. // 当交易记录表没有记录,才新增
  10. accountInfoDao.addAccountBalance(accountChange.getAccountNo(), accountChange.getAmount());
  11. accountInfoDao.addTx(accountChange.getTxNo());
  12. }
  13. }

}

  1. ##### 2.4.2.4. RocketMQ 事务消息监听器
  2. 创建消费 RocketMQ 事务消息监听器,需要实现 `org.apache.rocketmq.spring.core.RocketMQListener<T>` 接口,泛型 T 是消息的数据类型。在类上标识 `@RocketMQMessageListener` 注解,`topic` 属性指定消息的主题;`consumerGroup` 属性指定消息的分组

@Component @Slf4j @RocketMQMessageListener(topic = “topic_ensure_transfer”, consumerGroup = “consumer_ensure_transfer”) public class EnsureMessageConsumer implements RocketMQListener {

  1. @Autowired
  2. private AccountInfoService accountInfoService;
  3. @Override
  4. public void onMessage(String message) {
  5. log.info("EnsureMessageConsumer 消费消息:{}", message);
  6. // 1.解析消息
  7. final JSONObject jsonObject = JSON.parseObject(message);
  8. AccountChangeEvent accountChangeEvent = JSONObject
  9. .parseObject(jsonObject.getString("accountChange"), AccountChangeEvent.class);
  10. // 2.执行本地事务
  11. accountChangeEvent.setAccountNo("2");
  12. accountInfoService.updateAccountBalance(accountChangeEvent);
  13. }

}

```

2.5. 功能测试

  • bank1 和 bank2 都成功
  • bank1 执行本地事务失败,则 bank2 接收不到转账消息。(在AccountInfoServiceImpl.doUpdateAccountBalance方法中模拟异常)
  • bank1 执行完本地事务后,不返回任何信息,则 Broker 会进行事务回查。(在TransferTransactionListenerImpl.executeLocalTransaction方法返回结果前模拟异常)
  • bank2 执行本地事务失败,会进行重试消费。(在 bank2 工程 AccountInfoServiceImpl.updateAccountBalance方法中模拟异常)

    3. 总结

    可靠消息最终一致性就是保证消息从生产方经过消息中间件传递到消费方的一致性,本案例使用了 RocketMQ 作为消息中间件,RocketMQ 主要解决了两个功能:
  1. 本地事务与消息发送的原子性问题。
  2. 事务参与方接收消息的可靠性。

可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作,避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。