在RocketMQ中,有一个非常强悍有力的功能,就是事务消息的功能,凭借这个事务级的消息机制,就可以让我们确保生产者系统推送给出去的消息一定会成功写入MQ里,绝对不会半路就搞丢了。
发送half消息到MQ去,试探一下MQ是否正常
在基于RocketMQ的事务消息机制中,我们首先要让生产者系统去发送一条half消息到MQ去,这个half消息本质就是一个消息,只不过你可以理解为他这个消息的状态是half状态,这个时候消费者系统是看不见这个half消息的。然后我们去等待接收这个half消息写入成功的响应通知。大概这个意思就是说,确认一下MQ还活着,MQ也知道你后续可能想发送一条很关键的不希望丢失的消息给他了!万一要是half消息写入失败了呢?
这里我们先来分析第一种情况,万一要是你生产者系统写half消息给MQ就失败了呢?这个时候你的生产者系统就应该执行一系列的回滚操作。half消息成功之后,生产者系统完成自己的任务
接着我们来考虑第二种情况,你的half消息写成功了,这个时候你应该干什么呢?这个时候你的生产者系统就应该在自己本地的数据库里执行一些增删改操作了,因为一旦half消息写成功了,就说明MQ肯定已经收到这条消息了,MQ还活着,而且目前你是可以跟MQ正常沟通的。如果生产者系统的本地事务执行失败了怎么办?
接着我们继续看下一种情况,万一要是生产者系统更新自己的数据库失败了怎么办?这个时候直接就是让生产者系统发送一个rollback请求给MQ就可以了。这个意思就是说,你可以把之前我发给你的half消息给删除掉了,因为我自己这里都出问题了,已经无力跟你继续后续的流程了。如果生产者系统完成了本地事务之后,接着干什么?
如果生产者系统成功完成了本地的事务操作,此时你就可以发送一个commit请求给MQ,要求让MQ对之前的half消息进行commit操作,让消费者系统可以看见这个消息。所谓的half消息实际就是一个消息,只不过他的状态是half。如果发送half消息成功了,但是没收到响应呢?
这个时候我们没收到响应,可能就会网络超时报错,也可能直接有其他的异常错误,这个时候生产者系统会误以为是发送half消息到MQ失败。
但这个时候MQ已经存储下来一条half消息了,那对这个消息怎么处理?其实RocketMQ这里有一个补偿流程,他会去扫描自己处于half状态的消息,如果我们一直没有对这个消息执行commit/rollback操作,超过了一定的时间,他就会回调你的生产者系统的一个接口。他会问问你:这个消息到底怎么回事?你到底是打算commit这个消息还是要rollback这个消息?这个时候我们的生产者系统就得去查一下数据库,判断消息在生产者系统是否执行成功,如果未执行成功,此时就知道,你必然得发送rollback请求给MQ去删除之前那个half消息了!如果rollback或者commit发送失败了呢?
我们再假设一种场景,如果生产者系统是收到了half消息写入成功的响应了,同时尝试对自己的数据库更新了,然后根据失败或者成功去执行了rollback或者commit请求,发送给MQ了,结果因为网络故障,导致rollback或者commit请求发送失败了呢?
这个时候其实也很简单,因为MQ里的消息一直是half状态,所以说他过了一定的超时时间会发现这个half消息有问题,他会回调你的生产者系统的接口。你此时要判断一下,这个生产者系统执行成功当前消息的操作,那你就得再次执行commit请求,反之则再次执行rollback请求。
本质这个MQ的回调就是一个补偿机制,如果你的half消息响应没收到,或者rollback、commit请求没发送成功,他都会来找你问问对half消息后续如何处理。
再假设一种场景,如果生产者系统收到了half消息写入成功的响应了,同时尝试对自己的数据库更新了,然后根据失败或者成功去执行了rollback或者commit请求,发送给MQ了。很不巧,mq在这个时候挂掉了,导致rollback或者commit请求发送失败,怎么办?如果是这种情况的话,那就等mq自己重启了,重启之后他会扫描half消息,然后还是通过上面说到的补偿机制,去回调你的接口。事务消息机制的底层实现原理
其实写入一个Topic,最终是定位到这个Topic的某个MessageQueue,然后定位到一台Broker机器上去,然后写入的是Broker上的CommitLog文件,同时将消费索引写入MessageQueue对应的ConsumeQueue文件。
如果你写入一条half消息到Topic里去,会定位到这个Topic的一个MessageQueue,然后定位到RocketMQ的一台机器上去,接着按理说,消息会写入CommitLog。同时消息的offset会写入MessageQueue对应的ConsumeQueue,这个ConsumeQueue是属于Topic的,然后消费者系统按理说会从这个ConsumeQueue里获取到你写入的这个half消息。
但是实际上消费者系统却没法看到这条消息,其本质原因就是RocketMQ一旦发现你发送的是一个half消息,他不会把这个half消息的offset写入Topic的ConsumeQueue里去。
他会把这条half消息写入到自己内部的“RMQ_SYS_TRANS_HALF_TOPIC”这个Topic对应的一个ConsumeQueue里去。
真相大白了,所以对于事务消息机制之下的half消息,RocketMQ是写入内部Topic的ConsumeQueue的,不是写入你指定的Topic的ConsumeQueue的。
结合上面的内容,可以清晰判断出,必须要half消息进入到RocketMQ内部的RMQ_SYS_TRANS_HALF_TOPIC的ConsumeQueue文件了,此时就会认为half消息写入成功了,然后就会返回响应给生产者系统。所以这个时候,一旦你的生产者系统收到这个half消息写入成功的响应,必然就知道这个half消息已经在RocketMQ内部了。
假如因为网络故障,生产者系统没收到half消息的响应,或者说自己发送的rollback/commit请求失败了,那么RocketMQ会干什么?其实这个时候他会在后台有定时任务,定时任务会去扫描RMQ_SYS_TRANS_HALF_TOPIC中的half消息,如果你超过一定时间还是half消息,他会回调生产者系统的接口,让你判断这个half消息是要rollback还是commit。
如果执行rollback操作的话,如何标记消息回滚?假设我们的生产者系统执行了rollback请求,那么此时就需要对消息进行回滚。因为RocketMQ都是顺序把消息写入磁盘文件的,所以在这里如果你执行rollback,他的本质就是用一个OP操作来标记half消息的状态,RocketMQ内部有一个OP_TOPIC,此时可以写一条rollback OP记录到这个Topic里,标记某个half消息是rollback了。假设你一直没有执行commit/rollback,RocketMQ会回调生产者系统的接口去判断half消息的状态,但是他最多就是回调15次,如果15次之后你都没法告知他half消息的状态,就自动把消息标记为rollback。
如果执行commit操作,如何让消息对消费者系统可见?其实也很简单,你执行commit操作之后,RocketMQ就会在OP_TOPIC里写入一条记录,标记half消息已经是commit状态了。
接着需要把放在RMQ_SYS_TRANS_HALF_TOPIC中的half消息给写入到Topic的ConsumeQueue里去,然后我们的消费者系统可以就可以看到这条消息进行消费了。
事务消息机制的本质都是基于CommitLog、ConsumeQueue这套存储机制来做的,只不过中间有一些Topic的变换,half消息可能就是写入内部Topic的。
