Seata 是什么
Seata 是阿里近期开源的分布式事务框架,地址:https://github.com/seata/seata。框架包括了集团的 TXC(云版本叫 GTS)和蚂蚁金服的 TCC 两种模式,短短数月 Github 上的 star 数已经接近一万,算是目前唯一有大厂背书的分布式事务解决方案。
TXC 在 Seata 中又叫 AT 模式,意为补偿方法是框架自动生成的,对用户完全屏蔽,用户可以向使用本地事务那样使用分布式事务,缺点是仅支持关系型数据库(目前支持 MySQL),引入 Seata AT 的服务需要本地建表存储 rollback_info,隔离级别默认 RU 适用场景有限。
TCC 不算是新概念,很早就有了,用户通过定义 try/confirm/cancel 三个方法在应用层面模拟两阶段提交,区别在于 TCC 中 try 方法也需要操作数据库进行资源锁定,后续两个补偿方法由框架自动调用,分别进行资源提交和回滚,这点同单纯的存储层 2PC 不太一样。蚂蚁金服向 Seata 贡献了自己的 TCC 实现,据说已经演化了十多年,大量应用在在金融、交易、仓储等领域。
分布式事务的诞生背景
早期应用都是单一架构,例如支付服务涉及到的账户、金额、订单系统等都由单一应用负责,底层访问同一个数据库实例,自然事务操作也是本地事务,借助 Spring 可以轻松实现;但是由于量级越来越大,单一服务需要进行职责拆分变为三个独立的服务,通过 RPC 进行调用,数据也存在不同的数据库实例中,由于这时一次业务操作涉及对多个数据库数据的修改,无法再依靠本地事务,只能通过分布式事务框架来解决。

TCC 就是分布式事务的一种解决方案,属于柔性补偿型,优点在于理解简单、仅 try 阶段加锁并发性能较好,缺点在于代码改造成本。
什么是 TCC 本文就不再赘述了,TCC 的概念本身并不复杂
Seata TCC 使用方法
在分析源码之前,我们先简要提及下 Seata TCC 模式的使用方法,有助于后续理解整个 TCC 流程。
Seata TCC 参与方
Seata 中的 TCC 模式要求 TCC 服务的参与方在接口上增加 @TwoPhaseBusinessAction 注解,注明 TCC 接口的名称(全局唯一),TCC 接口的 confirm 和 cancel 方法的名称,用于后续框架反射调用,下面是一个 TCC 接口的案例:
public interface TccAction {@TwoPhaseBusinessAction(name = "yourTccActionName", commitMethod = "confirm", rollbackMethod = "cancel")public boolean try(BusinessActionContext businessActionContext, int a, int b);public boolean confirm(BusinessActionContext businessActionContext);public boolean cancel(BusinessActionContext businessActionContext);}
紧接着定义实现类 Impl 实现这个接口,为三个方法提供具体实现。最后将参与方服务进行发布,注册到远端,主要为了后续能让 Seata 框架调用到参与方的 confirm 或者 cancel 方法闭环整个 TCC 事务。
Seata TCC 发起方
Seata TCC 的发起方类似于我们上图中的 payment service,参与方需要在业务方法上增加 @GlobalTransactional 注解,用于开启切面注册全局事务,业务方法中调用 TCC 参与方的若干 try 方法,一旦业务方法调用成功,Seata 框架会通知 TC 回调这些参与方的 confirm 和 cancel 方法。
源码分析
Seata 中 TCC 模式的源码并不复杂,主要集中于:
| module | class | 功能 |
|---|---|---|
| seata-spring | GlobalTransactionalInterceptor.class | 全局事务切面逻辑,包括注册全局事务,拿到 xid |
| seata-spring | TccActionInterceptor.class | TCC 参与方切面逻辑 |
| seata-tcc | TCCResourceManager.class | 解析 TCC Bean,保存 TCC Resources,便于后续回调 |
| seata-tcc | ActionInterceptorHandler.class | TCC 分支事务注册实现 |
| seata-server | DefaultCoordinator.class、FileTransactionStoreManager.class | 主要是 TC 的实现、事务存储等实现 |
注册 TCC Resources
Seata 中一个 TCC 接口被称作一个 TCC Resources,其结构如下:
public class TCCResource implements Resource {private String resourceGroupId = "DEFAULT";private String appName;private String actionName; // TCC 接口名称private Object targetBean; // TCC Beanprivate Method prepareMethod; // try 方法private String commitMethodName;private Method commitMethod; // confirm 方法private String rollbackMethodName;private Method rollbackMethod; // cancel 方法// …… 省略}
Seata 解析到应用中存在 TCC Bean,则通过 parserRemotingServiceInfo 方法生成一个 TCCResource 对象,进而调用 TCCResourceManager 类的 registerResource 方法,将 TCCResource 对象保存到本地的 tccResourceCache 中,它是一个 ConcurrentHashMap 结构,同时通过 RmRpcClient 将该 TCCResource 的 resourceId、address 等信息注册到服务端,便于后续 TC 通过 RPC 回调到正确的地址。
// 解析 TCCResource 的部分代码Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();Method[] methods = interfaceClass.getMethods();if(isService(bean, beanName)){try {// 如果是 TCC service Bean,解析并注册该 resourceObject targetBean = remotingBeanDesc.getTargetBean();for(Method m : methods){TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);if(twoPhaseBusinessAction != null){// 如果有 TCC 参与方注解,定义一个 TCCResource,TCCResource tccResource = new TCCResource();tccResource.setActionName(twoPhaseBusinessAction.name());// TCC BeantccResource.setTargetBean(targetBean);// try 方法tccResource.setPrepareMethod(m);// confirm 方法名称tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());// confirm 方法对象tccResource.setCommitMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(), new Class[]{BusinessActionContext.class}));// cancel 方法名称tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());// cancel 方法对象tccResource.setRollbackMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(), new Class[]{BusinessActionContext.class}));// 调用到 TCCResourceManager 的 registerResource 方法DefaultResourceManager.get().registerResource(tccResource);}}}catch (Throwable t){throw new FrameworkException(t, "parser remting service error");}}
我们看一下 TCCResourceManager 的 registerResource 方法的实现:
// 内存中保存的 resourceId 和 TCCResource 的映射关系private Map<String, Resource> tccResourceCache = new ConcurrentHashMap<String, Resource>();@Overridepublic void registerResource(Resource resource) {TCCResource tccResource = (TCCResource) resource;tccResourceCache.put(tccResource.getResourceId(), tccResource);// 调用父类的方法通过 RPC 注册到远端super.registerResource(tccResource);}
我们看下 TCCResource 是如何注册到服务端的:
public void registerResource(Resource resource) {// 拿到 RmRpcClient 实例,调用其 registerResource 方法RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());}public void registerResource(String resourceGroupId, String resourceId) {if (LOGGER.isInfoEnabled()) {LOGGER.info("register to RM resourceId:" + resourceId);}synchronized (channels) {for (Map.Entry<String, Channel> entry : channels.entrySet()) {String serverAddress = entry.getKey();Channel rmChannel = entry.getValue();if (LOGGER.isInfoEnabled()) {LOGGER.info("register resource, resourceId:" + resourceId);}// 注册 resourceId,远端将其解析为一个 RpcContext 保存在内存中sendRegisterMessage(serverAddress, rmChannel, resourceId);}}}
GlobalTransaction 注册全局事务
GlobalTransaction 注解是全局事务的入口,其切面逻辑实现在 GlobalTransactionalInterceptor 类中。如果判断进入 @GlobalTransaction 修饰的方法,会调用 handleGlobalTransaction 方法进入切面逻辑,其中关键方法是 transactionalTemplate 的 execute 方法。
public Object execute(TransactionalExecutor business) throws Throwable {// 如果上游已经有 xid 传过来说明自己是下游,直接参与到这个全局事务中就可以,不必新开一个,角色是 Participant// 如果上游没有 xid 传递过来,说明自己是发起方,新开启一个全局事务,角色是 LauncherGlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();// …… …… 省略try {// 开启全局事务beginTransaction(txInfo, tx);Object rs = null;try {// 调用业务方法rs = business.execute();} catch (Throwable ex) {// 如果抛异常,通知 TC 回滚全局事务completeTransactionAfterThrowing(txInfo,tx,ex);throw ex;}// 如果不抛异常,通知 TC 提交全局事务commitTransaction(tx);return rs;}// …… …… 省略}
beginTransaction 方法调用了 transactionManager 的 begin 方法:
// 客户端@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);// 发送 RPC,获取 TC 下发的 xidGlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);return response.getXid();}// 服务端@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 全局事务用 GlobalSession 来表示GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 将 GlobalSession 写入文件存储session.begin();// 返回 UUID 作为全局事务 IDreturn XID.generateXID(session.getTransactionId());}
TwoPhaseBusinessAction 注册分支事务
全局事务调用业务方法时,会进入 TCC 参与方的切面逻辑,主要实现在 TccActionInterceptor 类中,关键方法是 actionInterceptorHandler 的 proceed 方法。
public Map<String, Object> proceed(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable {// …… …… 省略// 创建分支事务String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);actionContext.setBranchId(branchId);// 记录方法参数Class<?>[] types = method.getParameterTypes();int argIndex = 0;for (Class<?> cls : types) {if (cls.getName().equals(BusinessActionContext.class.getName())) {arguments[argIndex] = actionContext;break;}argIndex++;}// …… …… 省略}
doTccActionLogStore 方法负责注册分支事务:
// 客户端protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, BusinessActionContext actionContext) {String actionName = actionContext.getActionName();// 拿到全局事务 IDString xid = actionContext.getXid();// …… …… 省略try {// resourceManager 通过 RPC 向 TC 注册分支事务Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid, applicationContextStr, null);// 拿到 TC 返回的分支事务 IDreturn String.valueOf(branchId);}// …… …… 省略}// 服务端@Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);// 分支事务用 BranchSession 表示,新建一个 BranchSessionBranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);if (!branchSession.lock()) {throw new TransactionException(LockKeyConflict);}try {// 将分支事务加入全局事务中,也会写文件globalSession.addBranch(branchSession);} catch (RuntimeException ex) {throw new TransactionException(FailedToAddBranch);}// 返回分支事务 IDreturn branchSession.getBranchId();}
TC 回调参与方补偿方法
分支事务注册完毕,业务方法调用成功则通知 TC 提交全局事务。
@Overridepublic void commit() throws TransactionException {// 如果是参与者,无需发起提交请求if (role == GlobalTransactionRole.Participant) {return;}// 由 TM 向 TC 发出提交全局事务的请求status = transactionManager.commit(xid);}
TC 收到客户端 TM 的 commit 请求后:
@Overridepublic GlobalStatus commit(String xid) throws TransactionException {// 根据 xid 找出 GlobalSessionGlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));if (globalSession == null) {return GlobalStatus.Finished;}GlobalStatus status = globalSession.getStatus();// 关闭这个 GlobalSession,不让后续的分支事务再注册上来globalSession.closeAndClean();if (status == GlobalStatus.Begin) {// 修改状态为提交进行中globalSession.changeStatus(GlobalStatus.Committing);// 一旦分支事务中存在 TCC,做同步提交,其实 TCC 分支也可以异步提交,要求高性能时可以选择异步if (globalSession.canBeCommittedAsync()) {asyncCommit(globalSession);} else {doGlobalCommit(globalSession, false);}}return globalSession.getStatus();}
doGlobalCommit 是我们关注的关键方法,我们忽略其中的次要逻辑:
@Overridepublic void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {for (BranchSession branchSession : globalSession.getSortedBranches()) {// …… …… 省略try {// 调用 DefaultCoordinator 的 branchCommit 方法做分支提交// 参数有分支事务 id,resourceId 用来寻找对应的 TCCResource 和补偿方法参数信息BranchStatus branchStatus = resourceManagerInbound.branchCommit(branchSession.getBranchType(),XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),branchSession.getResourceId(), branchSession.getApplicationData());}}// …… …… 省略}
服务端的 DefaultCoordinator 类中的 branchCommit 方法发出 RPC 请求,调用对应 TCCResource 提供方:
@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData)throws TransactionException {// …… …… 省略// 获取全局事务和分支事务GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));BranchSession branchSession = globalSession.getBranch(branchId);// 根据 resourceId 找到对应的 channel 和 RpcContextBranchCommitResponse response = (BranchCommitResponse)messageSender.sendSyncRequest(resourceId,branchSession.getClientId(), request);// 返回分支事务提交状态return response.getBranchStatus();// …… …… 省略}
客户端自然是接收到分支提交的 RPC 请求,然后本地找出之前解析并保持下来的 TCCResource 进行补偿方法的反射调用,下面我们截取其中的关键步骤进行分析。
@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {// 根据 resourceId 找出内存中保留的 TCCResource 对象TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);if(tccResource == null){throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);}// 获取 targetBean 和相应的 method 对象Object targetTCCBean = tccResource.getTargetBean();Method commitMethod = tccResource.getCommitMethod();try {boolean result = false;// 取出补偿方法参数信息BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData);// 反射调用补偿方法Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);// 返回状态return result ? BranchStatus.PhaseTwo_Committed:BranchStatus.PhaseTwo_CommitFailed_Retryable;}// …… …… 省略}
事务存储
关于 Seata TC 模块如何进行事务存储,网上有的文章已经讲得很详细,例如 深度剖析一站式分布式事务方案 Seata-Server,因此这里不再赘述。
需要提及的一点是,TC 有可能成为整个分布式事务服务的性能瓶颈,因此如何做到高性能和高可用很重要,目前的存储方式是 File,代码中也有关于 DB Store Mode 的 TODO 项,文件相比于 DB 性能肯定好一些但是可用性会差一点,这块怎么保证要等到后续 HA Cluster 发布之后再看。
总结
整个 Seata 框架中关于 TCC 部分的源码并不复杂,本文只选取了部分类中的关键代码进行展示,忽略了一些判断逻辑和异常处理,笔者认为 Seata TCC 中关于 TCC 异常的封装和自定义处理、还有各种用户扩展埋点的设计也值得一看。
蚂蚁 SOFA Channel 之前做过一个关于 Seata TCC Seata TCC 分享 的讲解里也提到,TCC 框架的难点不在于本身,而在于如何写好一个 TCC 接口,如果对这部分内容感兴趣,可以点击链接进行详细了解。
