事务消息只和生产者有关系,和消费者没有太大的关系,

事务复杂流程

RocketMQ的事务消息的原理介绍和demo使用 - 图1

订单系统生产者通过RocketMQ发送消息给消费者

1.首先会把我们在代码中发的普通消息转成一个half消息,这个half对消费者是不可见的.
2.RocketMQ会返回half消息接收的结果
3.订单系统接收到了half消息结果之后会去执行一个本地事务,就是执行executeLocalTransaction方法
4.订单系统向RocketMQ返回一个状态,状态包括commit,rollback,unknown 三种状态,
如果是commit状态的话,RocketMQ就把消息给下游服务消费者去处理消费
如果是rollback状态就直接丢弃这个消息

  1. 如果是unknown状态的话RocketMQ就会等一段时间去订单系统生产者进行回查事务的状态,也就是去执行checkLocalTransaction方法,
  2. 订单系统生产者接收到了RocketMQ的状态回查请求后就会去执行checkLocalTransaction方法去检查本地事务状态,也相当于执行本地事务
  3. 向RocketMQ返回一个本地事务的检查状态,这个状态结果同样是有三个的,commit,rollback,unknown 三种状态,然后还会接着重复类似步骤4的操作,就是
    如果是commit状态的话,RocketMQ就把消息给下游服务消费者去处理消费
    如果是rollback状态就直接丢弃这个消息
    如果是unknown状态接着等一段时间去订单系统生产者进行回查事务的状态.
    如果你消息一直都是unknown状态的话,会一直循环的查询,当然RocketMQ也会有个回查次数,默认是15次,如果回查15次之后消息状态还是unknown的话,RocketMQ就会确认你这个事务状态是失败的,就会丢弃这个消息.

分什么要有事务机制(使用场景)

RocketMQ的事务消息的原理介绍和demo使用 - 图2

half消息的作用

订单系统需要下订单,下完订单需要等待支付,支付完了之后要推送给下游服务进行营销,比如说给客户发红包,给客户下物流单等等,

如果没有事务消息,你可能是先下单操作,然后再发送RocketMQ,如果RocketMQ宕机了,此时你是无法通知给消费者,这样事务的一致性就丢失了,

所以,这个half消息的作用是你在下单之前,我检查一下你这个RocketMQ服务是否正常,然后RocketMQ也会发送一个结果给生产者,意思是我RocketMQ服务正常,

订单系统在执行本地事务的时候有可能会失败, 此时往RocketMQ发送一个rollback状态, 那么RocketMQ就会丢弃掉这个原本准备发给消费者的消息,这样就保证了事务的一致性了.

为什么要来状态回查呢?回查有什么用的

因为有一种情况就是 你生产者在操作事务的时候,比如说生产者下单访问MySQL很慢怎么办,你这个下单可能要执行5秒钟, 此时RocketMQ已经等待你返回本地事务状态了,肯定不能长时间的等待,只能是先把事务状态收回来,此时是unknown状态,然后过段时间再来回查一下你本地事务,看看是否已经commit了,

所以正常情况下就是订单系统下单之后会先给RocketMQ返回一个unknown状态,此时RocketMQ会过一段时间,比如说5分钟以内进行回查一下你生产者的支付系统对账是否已经commit状态了因为你支付系统可能对接第三方银行.

此时当你完成支付操作了,此时RocketMQ才会把消息给消费者.

所以你整个流程的作用是保证你生产者的消息的本地事务和发消息给RocketMQ broker是一致的.也就是要么生产者的所有的数据库操作事务和生产者发送给MQ的消息一起成功,要么就是一起失败.

这个事务消息相当于延迟队列,但是和延迟队列还有区别

half消息是怎么不让消费者看见的

做法也比较朴素,就是把half消息发到 RMQ_SYS_TRANS_HALF_TOPIC 里面去,而我们Comsumer绑定的是业务的topic,这样肯定是看不到的,所以就保证了half消息不被消费者看到.

你发送的是普通消息,只不过RocketMQ内部实现的时候把这个消息变成了half消息,暂存到了RMQ_SYS_TRANS_HALF_TOPIC 这个系统topic里面去,当你生产者执行完了事务再给你的消息转移到你原本要发送的topic里面去.

消费者事务执行失败了怎么办?

需要注意的是,事务消息只是保证了整个分布式事务的一半儿, 也就是事务消息只能保证 生产者的本地事务和生产者发送给MQ消息是一致性的, 也就是说要么同时成功要么同时失败. 不会出现生产者事务失败了,然后消息还让RocketMQ接收到了commit状态了.

事务消息的性能

事务消息的性能肯定要比普通消息性能要差

事务消息的使用限制

下面内容出自 图灵学院的讲义

  1. 1、事务消息不支持延迟消息和批量消息。
  2. 2、为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = `transactionCheckMax` Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 `AbstractTransactionCheckListener` 类来修改这个行为。

回查次数是由BrokerConfig.transactionCheckMax这个参数来配置的,默认15次,可以在broker.conf中覆盖。
然后实际的检查次数会在message中保存一个用户属性MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES。这个属性值大于transactionCheckMax,就会丢弃。 这个用户属性值会按回查次数递增,也可以在Producer中自行覆盖这个属性。

  1. 3、事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 `transactionMsgTimeout` 参数。

由BrokerConfig.transactionTimeOut这个参数来配置。默认6秒,可以在broker.conf中进行修改。
另外,也可以给消息配置一个MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性来给消息指定一个特定的消息回查时间。
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, “10000”); 这样就是10秒。

  1. 4、事务性消息可能不止一次被检查或消费。
  2. 5、提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  3. 6、事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

代码演示

TransactionListener

  1. package org.apache.rocketmq.example.transaction;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.apache.rocketmq.client.producer.LocalTransactionState;
  4. import org.apache.rocketmq.client.producer.TransactionListener;
  5. import org.apache.rocketmq.common.message.Message;
  6. import org.apache.rocketmq.common.message.MessageExt;
  7. import java.util.concurrent.ConcurrentHashMap;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. /**
  10. * 实现了 TransactionListener接口
  11. */
  12. public class TransactionListenerImpl implements TransactionListener {
  13. private final AtomicInteger transactionIndex = new AtomicInteger(0);
  14. private final ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  15. /**
  16. * 执行本地事务
  17. *
  18. * @param msg
  19. * @param arg
  20. * @return
  21. */
  22. @Override
  23. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  24. //获取tags
  25. String tags = msg.getTags();
  26. // 如果是TagA就 提交消息
  27. if (StringUtils.contains(tags, "TagA")) {
  28. return LocalTransactionState.COMMIT_MESSAGE;
  29. //如果是 TagB 就回滚消息
  30. } else if (StringUtils.contains(tags, "TagB")) {
  31. return LocalTransactionState.ROLLBACK_MESSAGE;
  32. } else {
  33. //如果是别的 tag 就
  34. return LocalTransactionState.UNKNOW;
  35. }
  36. }
  37. /**
  38. * 检查本地事务
  39. *
  40. * @param msg
  41. * @return
  42. */
  43. @Override
  44. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  45. // 拿到消息的tag
  46. String tags = msg.getTags();
  47. // 包含 TagC的就直接 commit操作
  48. if (StringUtils.contains(tags, "TagC")) {
  49. return LocalTransactionState.COMMIT_MESSAGE;
  50. } else if (StringUtils.contains(tags, "TagD")) {
  51. return LocalTransactionState.ROLLBACK_MESSAGE;
  52. } else {
  53. return LocalTransactionState.UNKNOW;
  54. }
  55. }
  56. }

生产者

  1. package org.apache.rocketmq.example.transaction;
  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.SendResult;
  4. import org.apache.rocketmq.client.producer.TransactionListener;
  5. import org.apache.rocketmq.client.producer.TransactionMQProducer;
  6. import org.apache.rocketmq.common.message.Message;
  7. import org.apache.rocketmq.remoting.common.RemotingHelper;
  8. import java.io.UnsupportedEncodingException;
  9. import java.util.concurrent.*;
  10. public class TransactionProducer {
  11. public static void main(String[] args) throws MQClientException, InterruptedException {
  12. TransactionListener transactionListener = new TransactionListenerImpl();
  13. //专有的 TransactionMQProducer
  14. TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
  15. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  16. ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
  17. @Override
  18. public Thread newThread(Runnable r) {
  19. Thread thread = new Thread(r);
  20. thread.setName("client-transaction-msg-check-thread");
  21. return thread;
  22. }
  23. });
  24. producer.setExecutorService(executorService);
  25. producer.setTransactionListener(transactionListener);
  26. producer.start();
  27. String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
  28. //发了10个消息,其中两个消息是 TagA ,两个消息是TagB,两个消息是TagC,两个消息是TagD,两个消息是TagE
  29. for (int i = 0; i < 10; i++) {
  30. try {
  31. Message msg =
  32. new Message("TopicTest2", tags[i % tags.length], "KEY" + i,
  33. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  34. // 按事务的方式发送消息.
  35. SendResult sendResult = producer.sendMessageInTransaction(msg, null);
  36. System.out.printf("%s%n", sendResult);
  37. Thread.sleep(10);
  38. } catch (MQClientException | UnsupportedEncodingException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. for (int i = 0; i < 100000; i++) {
  43. Thread.sleep(1000);
  44. }
  45. producer.shutdown();
  46. }
  47. }

消费者

  1. package org.apache.rocketmq.example.transaction;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import java.util.List;
  10. import java.util.concurrent.atomic.AtomicLong;
  11. public class Consumer {
  12. public static void main(String[] args) throws InterruptedException, MQClientException {
  13. //使用指定的消费者组名称实例化
  14. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
  15. consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  16. /*从上次偏移量开始消耗*/
  17. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  18. //再订阅一个主题来消费
  19. consumer.subscribe("TopicTest2", "*");
  20. AtomicLong atomicLong = new AtomicLong(1); //创建一个计数器,初始值是1
  21. //注册回调以在从代理获取的消息到达时执行
  22. consumer.registerMessageListener(new MessageListenerConcurrently() {
  23. @Override
  24. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  25. ConsumeConcurrentlyContext context) {
  26. System.out.printf("%s 收到新消息:%s %n", Thread.currentThread().getName(), msgs);
  27. System.out.println("统计当前接收到了消息的个数" + atomicLong.getAndIncrement());
  28. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  29. }
  30. });
  31. //启动消费者实例
  32. consumer.start();
  33. System.out.printf("Consumer Started.%n");
  34. }
  35. }

执行结果以及说明

先启动消费者,再启动生产者

生产者控制台:

生产者控制台就不发了, 就是发了10个消息,其中两个消息是 TagA ,两个消息是TagB,两个消息是TagC,两个消息是TagD,两个消息是TagE

消费者控制台:

说明 , 可以看到2021年10月24日19时2分钟54秒收到了两个TAGS都是 TagA的消息.

然后2021年10月24日19时3分钟10秒的时候收到了一个TAGS=TagC的消息,

然后2021年10月24日19时3分钟13秒的时候收到了一个TAGS=TagC多消息.

为什么会是这样的输出结果呢? 原因是因为TransactionListenerImpl 的executeLocalTransaction那里配置了,如果tags包含了”TagA” 就进行本地事务commit操作,所以这样RocketMQ就会将消息直接发送给Consumer,所以Consumer就能立马接收到了两个TAGS=TagA的消息.

为什么隔了几十秒才接收到了TAGS=TagC的消息?

原因是因为TransactionListenerImpl的executeLocalTransaction方法那里如果tags不是”TagA”或者”TagB”的时候,就会返回给RocketMQ的本地事务状态为unknow. 此时RocketMQ会在十几秒之后进行回查事务状态操作, 回查事务状态的时候会执行TransactionListenerImpl的checkLocalTransaction方法,在checkLocalTransaction方法里面的逻辑是如果tags包含了”TagC”就发送给RocketMQ一个本地事务状态为commit状态,这样RocketMQ在接收到了这个commit状态之后,就会将这个消息发送给Consumer了,这样Consumer就接收到了TAGS=TagC的消息

为什么没有接收到TAGS=TagB的消息,因为在TransactionListenerImpl的executeLocalTransaction方法里面逻辑是 如果tags包含了”TagB”就会发送rollback的本地事务状态给RocketMQ,RocketMQ接收到了rollback状态之后就会把这个消息丢弃掉,这样tags包含”TagB”的消息就不会被Consumer消费.

  1. Consumer Started.
  2. 当前时间是: 20211024192分钟54 , 当前线程为ConsumeMessageThread_1 收到新消息:[MessageExt [brokerName=broker-a, queueId=2, storeSize=300, queueOffset=0, sysFlag=8, bornTimestamp=1635073374460, bornHost=/172.16.10.1:53725, storeTimestamp=1635073374338, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C72FA03, commitLogOffset=477297155, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=477296841, toString()=Message{topic='TopicTest2', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest2, MAX_OFFSET=1, KEYS=KEY0, TRAN_MSG=true, CONSUME_START_TIME=1635073374486, UNIQ_KEY=AC100A013FD818B4AAC27A88A0FB0000, CLUSTER=rocketmq-cluster, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='AC100A013FD818B4AAC27A88A0FB0000'}]]
  3. 统计当前接收到了消息的个数1
  4. 当前时间是: 20211024192分钟54 , 当前线程为ConsumeMessageThread_2 收到新消息:[MessageExt [brokerName=broker-b, queueId=3, storeSize=300, queueOffset=1, sysFlag=8, bornTimestamp=1635073374539, bornHost=/172.16.10.1:53726, storeTimestamp=1635073374415, storeHost=/172.16.10.103:10911, msgId=AC100A6700002A9F0000000000258870, commitLogOffset=2459760, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=2459446, toString()=Message{topic='TopicTest2', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest2, MAX_OFFSET=2, KEYS=KEY5, TRAN_MSG=true, CONSUME_START_TIME=1635073374545, UNIQ_KEY=AC100A013FD818B4AAC27A88A14B0005, CLUSTER=rocketmq-cluster, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagA, REAL_QID=3}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='AC100A013FD818B4AAC27A88A14B0005'}]]
  5. 统计当前接收到了消息的个数2
  6. 当前时间是: 20211024193分钟10 , 当前线程为ConsumeMessageThread_3 收到新消息:[MessageExt [brokerName=broker-b, queueId=0, storeSize=326, queueOffset=1, sysFlag=8, bornTimestamp=1635073374496, bornHost=/172.16.10.1:53726, storeTimestamp=1635073390565, storeHost=/172.16.10.103:10911, msgId=AC100A6700002A9F0000000000258F75, commitLogOffset=2461557, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=2460537, toString()=Message{topic='TopicTest2', flag=0, properties={TRANSACTION_CHECK_TIMES=1, TRAN_MSG=true, CONSUME_START_TIME=1635073390694, MIN_OFFSET=0, REAL_TOPIC=TopicTest2, MAX_OFFSET=2, KEYS=KEY2, UNIQ_KEY=AC100A013FD818B4AAC27A88A1200002, CLUSTER=rocketmq-cluster, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagC, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='AC100A013FD818B4AAC27A88A1200002'}]]
  7. 统计当前接收到了消息的个数3
  8. 当前时间是: 20211024193分钟13 , 当前线程为ConsumeMessageThread_4 收到新消息:[MessageExt [brokerName=broker-a, queueId=1, storeSize=326, queueOffset=1, sysFlag=8, bornTimestamp=1635073374565, bornHost=/172.16.10.1:53725, storeTimestamp=1635073393360, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C730AE4, commitLogOffset=477301476, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=477300456, toString()=Message{topic='TopicTest2', flag=0, properties={TRANSACTION_CHECK_TIMES=1, TRAN_MSG=true, CONSUME_START_TIME=1635073393498, MIN_OFFSET=0, REAL_TOPIC=TopicTest2, MAX_OFFSET=2, KEYS=KEY7, UNIQ_KEY=AC100A013FD818B4AAC27A88A1650007, CLUSTER=rocketmq-cluster, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagC, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='AC100A013FD818B4AAC27A88A1650007'}]]
  9. 统计当前接收到了消息的个数4

码云代码地址

https://gitee.com/zjj19941/ZJJ_RocketMQ/tree/master/TuLing4-RocketMQ-Demo/rocketMQ-API/src/main/java/org/apache/rocketmq/example/transaction