可靠消息最终一致性方案核心流程
角色分类
上游服务投递消息
(1、首先,上游服务需要发送一条消息给可靠消息服务。(可以认为是对下游服务一个接口的调用,里面包含了对应的一些参数)。
(2、然后,可靠消息服务就把这条消息存储到自己的数据库里去,状态为【待确认】。
(3、接着,上游服务就可以执行自己本地的数据库操作,根据自己的执行结果,再次调用可靠消息服务的接口。
(4、如果本地数据库操作执行成功,那么就找可靠消息服务确认那条消息。如果本地数据库操作失败了,那么就找可靠消息服务删除那条消息。
(5、此时如果是确认消息,那么可靠消息服务就把数据库里的消息状态更新为【已发送】,同时将消息发送给MQ(更新数据库里的消息状态和投递消息给MQ,两个操作需要放在一个方法里,而且得开启本地事务。)——— 如果数据库更新消息失败了,那么就抛出异常退出,就别投递给MQ。
——— 如果投递给MQ报错了,那么就要抛出异常让本地数据库事务回滚。
——— 这两个操作必须得一起成功,或者一起失败。
(5、如果上游服务是通知删除消息的,那么就在可靠消息服务里删除这条消息。
下游服务接收消息
(1、 下游服务就一直等着从MQ消费消息就好了,如果消费了消息,那么就操作自己的本地数据库。
(2、如果操作成功了,就反过来通知可靠消息服务,说自己处理成功了,然后可靠消息服务就会把消息的状态设置为【已完成】。
如何保证上游服务对消息的100%可靠投递
(1) 如果是上游服务给可靠消息服务发送待确认消息的过程的过程出错了。此时上游服务是可以感知到调用异常的,就不用执行后续的流程。
(2)如果是上游服务操作完本地数据库之后,通知可靠消息服务的确认消息或者删除消息的时候,出现了问题(比如是没通知成功,或者可靠消息没执行【已发送】成功,或者是可靠消息服务没有成功的投递消息给MQ)。
—— 此时遇到上述的情况下,因为这条消息在可靠消息服务的数据库里的状态就一直是【待确认】的状态。
—— 此时,可以在可靠消息服务里开发一个后台定时运行的线程,不停的检查各个消息的状态,如果一直是【待确认】的状态,就会认为这个消息出现了什么问题。这时就可以回调上游服务提供的一个接口,确认上游服务对这个消息的数据库操作,是否执行成功了。如果执行成功了,那么可靠消息服务将消息状态修改为【已发送】,同时投递消息给MQ(这两个操作需要在一个事务里面)。如果上游服务回复的结果是执行失败,那么可靠消息就将数据库中的消息删除即可。
通过上面的机制,就可以保证,可靠消息服务一定会尝试完成消息到MQ的投递。
如何保证下游服务对消息100%可靠接收
(1)如果下游服务消费出了问题,没消费到?或者是下游服务对消息处理失败了。
(2)会在可靠消息服务里开启一个后台线程,不断检查消息状态。如果消息状态一直是【已发送】,始终没有变成【已完成】,就说明下游服务始终没有处理成功。
(3)此时可靠消息服务就可以再次尝试投递到MQ,让下游服务再次处理。
(4)只要下游服务的接口逻辑实现幂等性(保证多次处理一个消息的时候,不会插入重复的数据即可)。