概念介绍
- 事务消息:消息队列 MQ 提供类似 X/Open XA 的分布式事务功能,通过消息队列 MQ 事务消息能达到分布式事务的最终一致。
- 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成 “暂不能投递” 状态,处于该种状态下的消息即半事务消息。
- 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 MQ 服务端通过扫描发现某条消息长期处于 “半事务消息” 时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。
适用场景
事务消息的适用场景示例:
通过购物车进行下单的流程中,用户入口在购物车系统,交易下单入口在交易系统,两个系统之间的数据需要保持最终一致,这时可以通过事务消息进行处理。交易系统下单之后,发送一条交易下单的消息到消息队列 MQ,购物车系统订阅消息队列 MQ 的交易下单消息,做相应的业务处理,更新购物车数据。交互流程
事务消息交互流程如下图所示。
事务消息发送步骤如下:
- 发送方将半事务消息发送至消息队列 MQ 服务端。
- 消息队列 MQ 服务端将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤如下:
- 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。
注意事项
事务消息的 Group ID 不能与其他类型消息的 Group ID 共用。与其他类型的消息不同,事务消息有回查机制,回查时消息队列 MQ 服务端会根据 Group ID 去查询客户端。
- 通过 ONSFactory.createTransactionProducer 创建事务消息的 Producer 时必须指定 LocalTransactionChecker 的实现类,处理异常情况下事务消息的回查。
- 事务消息发送完成本地事务后,可在 execute 方法中返回以下三种状态:
- TransactionStatus.CommitTransaction:提交事务,允许订阅方消费该消息。
- TransactionStatus.RollbackTransaction:回滚事务,消息将被丢弃不允许消费。
- TransactionStatus.Unknow:暂时无法判断状态,等待固定时间以后消息队列 MQ 服务端向发送方进行消息回查。
- 可通过以下方式给每条消息设定第一次消息回查的最快时间:
- Message message = new Message();
- // 在消息属性中添加第一次消息回查的最快时间,单位秒。例如,以下设置实际第一次回查时间为 120 秒 ~ 125 秒之间
- message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,”120”);
- // 以上方式只确定事务消息的第一次回查的最快时间,实际回查时间向后浮动 0 秒 ~ 5 秒;如第一次回查后事务仍未提交,后续每隔 5 秒回查一次
如何使用
此处我引用官网的 demo,进行简单说明。
- 事务状态:rocketmq 定义了三种事务状态
- TransactionStatus.CommitTransaction:消息提交,当消息状态为 CommitTransaction,表示允许消费者允许消费当前消息
- TransactionStatus.RollbackTransaction:消息回滚,表示 MQ 服务端将会删除当前半消息,不允许消费者消费。
- TransactionStatus.Unknown:中间状态,表示 MQ 服务需要发起回查操作,检测当前发送方本地事务的执行状态。
发送事务消息
创建事务消息生产者使用 TransactionMQProducer 创建消息发送客户端。并指定一个唯一的生产者组 producerGroup,当执行完本地事务,需要返回给 MQ 服务端执行结果,返回上面的三种事务状态。CommitTransaction、RollbackTransaction、Unknown
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
实现 TransactionListener 接口实现 executeLocalTransaction 方法。消息生产者需要在 executeLocalTransaction 中执行本地事务当事务半消息提交成功,执行完毕后需要返回事务状态码。
实现 checkLocalTransaction 方法,该方法用于进行本地事务执行情况回查,并回应事务状态给 MQ 的 broker,执行完成之后需要返回对应的事务状态码。 ```sql public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap
@Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; }
@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } } ``` 对于消费者,需要通过业务参数保证消费的幂等。
附录–RocketMQ 事务消息实现原理
有些同学想要深入了解 RocketMQ 实现事务消息操作的原理,这里我引用一下官方的一段博客的内容,具体的地址为:里程碑 | Apache RocketMQ 正式开源分布式事务消息
RocketMQ 事务消息在实现上充分利用了 RocketMQ 本身机制,在实现零依赖的基础上,同样实现了高性能、可扩展、全异步等一系列特性。
在具体实现上,RocketMQ 通过使用 Half Topic 以及 Operation Topic 两个内部队列来存储事务消息推进状态,如下图所示:
其中,Half Topic 对应队列中存放着 prepare 消息,Operation Topic 对应的队列则存放了 prepare message 对应的 commit/rollback 消息,消息体中则是 prepare message 对应的 offset,服务端通过比对两个队列的差值来找到尚未提交的超时事务,进行回查。
在具体实现上,事务消息作为普通消息的一个应用场景,在实现过程中进行了分层抽象,从而避免了对 RocketMQ 原有存储机制的修改,如下图所示:
从用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可;而在 service 层,则对事务消息的两阶段提交进行了抽象,同时针对超时事务实现了回查逻辑,通过不断扫描当前事务推进状态,来不断反向请求 Producer 端获取超时事务的执行状态,在避免事务挂起的同时,也避免了 Producer 端的单点故障。而在存储层,RocketMQ 通过 Bridge 封装了与底层队列存储的相关操作,用以操作两个对应的内部队列,用户也可以依赖其它他存储介质实现自己的 service,RocketMQ 会通过 ServiceProvider 加载进来。
从上述事务消息设计中可以看到,RocketMQ 事务消息较好的解决了事务的最终一致性问题,事务发起方仅需要关注本地事务执行以及实现回查接口给出事务状态判定等实现,而且在上游事务峰值高时,可以通过消息队列,避免对下游服务产生过大压力。
事务消息不仅适用于上游事务对下游事务无依赖的场景,还可以与一些传统分布式事务架构相结合,而 MQ 的服务端作为天生的具有高可用能力的协调者,使得我们未来可以基于 RocketMQ 提供一站式轻量级分布式事务解决方案,用以满足各种场景下的分布式事务需求。