有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上的一些主流的MQ都是不支持事务消息的,比如RabbitMQ和Kafka都不支持。
我们知道事务消息。可以想象一下,如果没有事务消息的话,我们用普通消息如何保证最终一致性。
普通消息先发消息方式
假如分为两步操作
1、发送消息,发送消息成功(发送消息通知B系统给B增加50元)
2、执行本地事务(扣除A账户50元)
异常情况 | 结果 | |
---|---|---|
发送消息失败 | 不进行2步骤执行 | |
发送消息成功 | 进行本地事务执行,步骤2执行如果失败,需要通知B系统去回滚给B增加的50元 |
普通消息先进行本地事务执行
事务也有执行的先后顺序,A转账给B,至少要等A扣除钱了,才能通知B系统,这才是正常的流程。现在更改如下
假如分为两步操作
1、执行本地事务(扣除A账户50元)
2、发送消息,发送消息成功(发送消息通知B系统给B增加50元)
异常情况 | 结果 | |
---|---|---|
本地事务执行失败 | 不进行2步骤执行 | |
本地事务执行成功,消息发送失败 | 消息发送失败,要么补发消息,要么进行A事务回滚 |
两种情景,在设计的过程中,其实更偏向与第二种方式,因为它更符合正常的逻辑顺序 第一种方式需要跨B系统进行回滚事务,操作性更复杂 第二种方式,要么回滚,要么补发消息。操作的逻辑完全在A系统这边,操作起来更容易。但是面临回滚操作,当前只有B一个系统,如果此处操作涉及5个系统,其中一个环节出了问题,都要进行相应的回滚。系统越多,回滚逻辑也就越复杂。所以普通消息保证事务可以完成,但是需要做很多额外的操作保证回滚,或者补发,达到最终一致性的效果
事务消息
事务消息依赖于支持“事务消息”的消息队列,其基本思想是 利用消息中间间实施两阶段提交,将本地事务和发消息放在了一个分布式事务里,保证要么本地操作成功成功并且对外发消息成功,要么两者都失败。流程如下:
- 主事务向消息队列发送预备消息
- 主事务收到ACK之后本地执行主事务
- 根据执行的结果(成功或失败)向消息队列发送提交或者回滚消息
详细的流程如下图所示:
事务消息作为一种异步确保型事务, 将两个事务分支通过 MQ 进行异步解耦,RocketMQ 事务消息的设计流程同样借鉴了两阶段提交理论,整体交互流程如下图所示:
- 事务发起方首先发送 prepare 消息到 MQ。
- 在发送 prepare 消息成功后执行本地事务。
- 根据本地事务执行结果返回 commit 或者是 rollback。
- 如果消息是 rollback,MQ 将删除该 prepare 消息不进行下发,如果是 commit 消息,MQ 将会把这个消息发送给 consumer 端。
- 如果执行本地事务过程中,执行端挂掉,或者超时,MQ 将会不停的询问其同组的其他 producer 来获取状态。
- Consumer 端的消费成功机制有 MQ 保证。
源码如下:
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
throws MQClientException {
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
源码解析:
1、事务发起方首先发送 prepare 消息到 MQ。
2、如果prepare消息发送成功,则进行本地事务的实现LocalTransactionExecuter的执行
3、如果prepare消息发送失败,则设置LocalTransactionState.ROLLBACK_MESSAGE状态,进行回滚操作
4、MQ server会根据LocalTransactionState状态去commit/rollback这条prepare消息
5、如果commit,则发送到消费方去消费这条消息
6、如果MQ server没有收到LocalTransactionState,broker定时回查本地事务的执行结果。回查频率默认1分钟,如果超过检测次数(默认15次),消息会默认为丢弃,即回滚消息。回查需要实现TransactioCheckListener类。