1 Seata 源码搭建
1.1 环境准备
JDK 1.8
Maven 3.2+
1.2 源码构建
从github上,下载seata的源码到本地,版本选择1.3.0 https://github.com/seata/seata/tree/1.3.0
1.3 导⼊idea⼯程
将lagou-parent⼯程放⼊seata源码中导⼊idea

2. idea⼯程⽬录
3. registry.conf ⽂件替换之前已搭建好的nacos注册中⼼和配置中⼼的⽂件 ```java registry {file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “nacos” loadBalance = “RandomLoadBalance” loadBalanceVirtualNodes = 10
nacos { application = “seata-server” serverAddr = “106.75.226.220:8848” group = “SEATA_GROUP” namespace = “” cluster = “default” username = “nacos” password = “nacos” } eureka { serviceUrl = “http://localhost:8761/eureka“ application = “default” weight = “1” } redis { serverAddr = “localhost:6379” db = 0 password = “” cluster = “default” timeout = 0 } zk { cluster = “default” serverAddr = “127.0.0.1:2181” sessionTimeout = 6000 connectTimeout = 2000 username = “” password = “” } consul { cluster = “default” serverAddr = “127.0.0.1:8500” } etcd3 { cluster = “default” serverAddr = “http://localhost:2379“ } sofa { serverAddr = “127.0.0.1:9603” application = “default” region = “DEFAULT_ZONE” datacenter = “DefaultDataCenter” cluster = “default” group = “SEATA_GROUP” addressWaitTime = “3000” } file { name = “file.conf” } }
config {
file、nacos 、apollo、zk、consul、etcd3
type = “nacos”
nacos { serverAddr = “106.75.226.220:8848” namespace = “” group = “SEATA_GROUP” username = “nacos” password = “nacos” } consul { serverAddr = “127.0.0.1:8500” } apollo { appId = “seata-server” apolloMeta = “http://192.168.1.204:8801“ namespace = “application” apolloAccesskeySecret = “” } zk { serverAddr = “127.0.0.1:2181” sessionTimeout = 6000 connectTimeout = 2000 username = “” password = “” } etcd3 { serverAddr = “http://localhost:2379“ } file { name = “file.conf” } }
4. 编译⼯程clean-->compile<br /><br /><br />查看nacos是否注册上<br /><a name="O98Q1"></a>## 1.4 seata⼯程结构<br /><br />先来看下整体seata 的⼯程结构<br />- **seata-common 模块:** seata-common 项⽬,提供 Seata 封装的⼯具类、异常类等- **seata-core 模块:** seata-core 项⽬,提供 Seata 封装的 RPC、数据模型、通信消息格式等- **seata-config 模块**:从配置中⼼读取配置。- **seata-discovery模块:**⽤于 Seata TC 注册到注册中⼼。⽤于 Seata TM 从注册中⼼发现 Seata TC。- **seata-rm模块:** seata-rm 项⽬,Seata 对 RM 的核⼼实现- **seata-rm-datasource 模块:**seata-rm-datasource 项⽬,Seata 通过对 JDBC 拓展,从⽽实现对MySQL 等的透明接⼊ Seata RM 的实现- **seata-server 模块:**seata-server 项⽬,Seata 对 TC 的核⼼实现,提供了事务协调、锁、事务状态、事务会话等功能- **seata-tm模块:**seata-tm 项⽬,Seata 对 TM 的实现,提供了全局事务管理,例如说事务的发起,提交,回滚等- **seata-tcc 模块:**seata-tcc 项⽬,Seata 对 TCC 事务模式的实现- **seata-spring 模块:** seata-spring 项⽬,Spring 对 Seata 集成的实现。例如说,使⽤@GlobalTransactional 注解,⾃动创建全局事务,就是通过 seata-spring 项⽬来实现的。<a name="Yq8aw"></a># 2 AT源码<br /><br /><a name="HaXaB"></a>## 2.1 TC 启动流程****TCserver启动执行main方法:****首先设置日志存储模式**<br />**其次初始化一些线程池 定期检查服务注册 心跳 事务提交 回滚等操作**<br /><br />最后执行服务端初始化1、coordinator.init();```java/*** Init.*/public void init() {//重试回滚线程池 每秒钟执行一次retryRollbacking.scheduleAtFixedRate(() -> {try {handleRetryRollbacking();} catch (Exception e) {LOGGER.info("Exception retry rollbacking ... ", e);}}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//重试提交线程池 每秒钟执行一次retryCommitting.scheduleAtFixedRate(() -> {try {handleRetryCommitting();} catch (Exception e) {LOGGER.info("Exception retry committing ... ", e);}}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//每秒钟执行一次asyncCommitting.scheduleAtFixedRate(() -> {try {//异步事务提交handleAsyncCommitting();} catch (Exception e) {LOGGER.info("Exception async committing ... ", e);}}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);timeoutCheck.scheduleAtFixedRate(() -> {try {timeoutCheck();} catch (Exception e) {LOGGER.info("Exception timeout checking ... ", e);}}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);//undoLogDelte删除undoLogDelete.scheduleAtFixedRate(() -> {try {undoLogDelete();} catch (Exception e) {LOGGER.info("Exception undoLog deleting ... ", e);}}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);}
查看一下 handleAsyncCommitting() 异步事务提交
- io.seata.core.rpc.netty.NettyRemotingServer#init()方法: 服务端初始化

registerProcessor方法:
super.init()
2.2 TM / RM 初始化与服务注册TC
2.2.1 流程分析
2.2.2 注册主要源码跟踪
- 导⼊seata的起步依赖后,会加载spring-cloud-alibaba-seata-2.1.0.RELEASE.jar下⾯的
spring.factories⽂件,在⽂件中加载GlobalTransactionAutoConfiguration类,针对seata的⾃动配置类
2. 声明GlobalTransactionScanner全局事务扫描Bean
3. GlobalTransactionScanner实现了spring bean⽣命周期接⼝InitializingBean,会执⾏afterPropertiesSet⽅法

4. initClient⽅法

TMClient.init⽅法

getInstance⽅法获取实例
TmNettyRemotingClient构造⽅法public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {super(messageExecutor);this.transactionRole = transactionRole;//创建Netty客户端引导类clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);//设置通道处理程序clientBootstrap.setChannelHandlers(new ClientHandler());//创建Netty客户端通道管理器clientChannelManager = new NettyClientChannelManager(new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);}
ClientHandler客户端处理类
/*** The type ClientHandler.*/@Sharableclass ClientHandler extends ChannelDuplexHandler {//读取通道消息@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}//处理消息processMessage(ctx, (RpcMessage) msg);}........}
- init()⽅法
@Overridepublic void init() {//创建一个延时线程 在60秒后执行timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//连接netty服务端clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);if (NettyClientConfig.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动客户端引导类clientBootstrap.start();}
reconnect⽅法

seata-server 的 RegTmProcessor处理TM注册 ```java /*
- Copyright 1999-2019 Seata.io Group. *
- Licensed under the Apache License, Version 2.0 (the “License”);
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at *
- http://www.apache.org/licenses/LICENSE-2.0 *
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an “AS IS” BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. */ package io.seata.core.rpc.processor.server;
import io.netty.channel.ChannelHandlerContext; import io.seata.common.loader.EnhancedServiceLoader; import io.seata.common.util.NetUtil; import io.seata.core.protocol.RegisterTMRequest; import io.seata.core.protocol.RegisterTMResponse; import io.seata.core.protocol.RpcMessage; import io.seata.core.protocol.Version; import io.seata.core.rpc.netty.ChannelManager; import io.seata.core.rpc.RemotingServer; import io.seata.core.rpc.RegisterCheckAuthHandler; import io.seata.core.rpc.processor.RemotingProcessor; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
/**
- process TM client registry message.
- process message type:
- {@link RegisterTMRequest} *
- @author zhangchenghui.dev@gmail.com
@since 1.3.0 */ public class RegTmProcessor implements RemotingProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RegTmProcessor.class);
private RemotingServer remotingServer;
private RegisterCheckAuthHandler checkAuthHandler;
public RegTmProcessor(RemotingServer remotingServer) {
this.remotingServer = remotingServer;this.checkAuthHandler = EnhancedServiceLoader.load(RegisterCheckAuthHandler.class);
}
@Override public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
onRegTmMessage(ctx, rpcMessage);
}
private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody();String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());Version.putChannelVersion(ctx.channel(), message.getVersion());boolean isSuccess = false;String errorInfo = StringUtils.EMPTY;try {if (null == checkAuthHandler || checkAuthHandler.regTransactionManagerCheckAuth(message)) {ChannelManager.registerTMChannel(message, ctx.channel());Version.putChannelVersion(ctx.channel(), message.getVersion());isSuccess = true;if (LOGGER.isDebugEnabled()) {LOGGER.debug("checkAuth for client:{},vgroup:{},applicationId:{}",ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());}}} catch (Exception exx) {isSuccess = false;errorInfo = exx.getMessage();LOGGER.error("TM register fail, error message:{}", errorInfo);}RegisterTMResponse response = new RegisterTMResponse(isSuccess);if (StringUtils.isNotEmpty(errorInfo)) {response.setMsg(errorInfo);}remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);if (LOGGER.isInfoEnabled()) {LOGGER.info("TM register success,message:{},channel:{},client version:{}", message, ctx.channel(),message.getVersion());}
}
}
<a name="L4UWv"></a>## 2.4 TM开启全局事务<br /><br /><a name="GDKVx"></a>### 2.4.1 TM开启全局事务流程分析<a name="nekaH"></a>### 2.3.1 主要源码跟踪<br /><br />1. wrapIfNecessary⽅法```java//容器创建bean的时候 调用此方法检查是否有全局事务@Overrideprotected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {//判断是否有配置全局事务开启if (disableGlobalTransaction) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//检查是否为TCC代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));} else {Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//检查是否有GlobalTransactional注解if (!existsAnnotation(new Class[]{serviceInterface})&& !existsAnnotation(interfacesIfJdk)) {return bean;}if (interceptor == null) {if (globalTransactionalInterceptor == null) {//创建全局事务拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));for (Advisor avr : advisor) {advised.addAdvisor(0, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}
- GlobalTransactionalInterceptor的invoke⽅法
```java
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
}Class<?> targetClass =methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);//获取方法上GlobalTransactional注解信息final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);if (!localDisable) {//判断注解是否为空if (globalTransactionalAnnotation != null) {//处理全局事务return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);} else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation);}}}return methodInvocation.proceed();
3. handleGlobalTransaction的execute⽅法```java/*** Execute object.** @param business the business* @return the object* @throws TransactionalExecutor.ExecutionException the execution exception*/public Object execute(TransactionalExecutor business) throws Throwable {// 1 get transactionInfo//获取事务信息TransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 获取或创建一个全局事务GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();// 1.2 处理事务传播Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {case NOT_SUPPORTED:suspendedResourcesHolder = tx.suspend(true);return business.execute();case REQUIRES_NEW:suspendedResourcesHolder = tx.suspend(true);break;case SUPPORTS:if (!existingTransaction()) {return business.execute();}break;case REQUIRED:break;case NEVER:if (existingTransaction()) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s",RootContext.getXID()));} else {return business.execute();}case MANDATORY:if (!existingTransaction()) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}try {// 2. 开启事务beginTransaction(txInfo, tx);Object rs = null;try {//执行目标方法rs = business.execute();} catch (Throwable ex) {// 3.回滚业务异常所需completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.commitTransaction(tx);return rs;} finally {//5. cleartriggerAfterCompletion();cleanUp();}} finally {tx.resume(suspendedResourcesHolder);}}
- beginTransaction⽅法最终会发送seata server 也就是TC

=>io.seata.tm.DefaultTransactionManager#begin TM事务管理器开启事务@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {//创建全局开启事务请求GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);//同步调用开启事务GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}return response.getXid();}
seata-server 的ServerOnRequestProcessor处理请求
=》io.seata.core.rpc.processor.server.ServerOnRequestProcessor#onRequestMessage
6. onRequest的⽅法最终DefaultCoordinator的doGlobalBegin⽅法
core.begin开启@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {//创建全局会话GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);//添加监听器session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//开启事务会话session.begin();// transaction start eventeventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));//返回全局事务XIdreturn session.getXid();}
session.begin()—>DataBaseSessionManager的addGlobalSession

writeSession⽅法
2.5 RM分⽀事务注册
2.5.1 RM分⽀事务注册流程分析
2.5.2 全局事务xid传输分析
注册流程中分支事务通过全局事务Xid来判断当前数据源执行是否是全局事务,Xid由TC生成传给TM,TM和RM为不同的模块,那么TM在feign调用远程微服务(RM)时,RM如何获取Xid来判断是否有全局事务?
在上小节TM开启全局事务时,同步发消息给TC由TC生成全局事务Xid返回给TM
io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)
将全局事务xid绑定到seata上下文对象中。
TM调用方式:
通过追踪可以知道orderServiceFeign 使用的是SeataFeignClient 进行远程调用
seataFeign自动配置类:


=》warp(bean) 判断为feignClient时 将bean封装成SeataFeign 也就是说将FeignClient对象bean 替换成了SeataFeignClient
=>SeataFeignClient#execute()发送feign远程调用

=> getModifyRequest()
由此 xid传递到下一个RM中
2.5.3 RM分支事务注册源码分析
代理数据源创建

ConnectionProxy代理连接对象创建
PreparedStatementProxy代理执⾏对象创建
当发起数据库操作的时候最终会调⽤PreparedStatementProxy.execute⽅法

ExecuteTemplate.execute()
executor.execute⽅法

doExecute—>executeAutoCommitTrue(args)

executeAutoCommitFalse(args)⽅法

6. connectionProxy.commit();提交事务⽅法
server端处理分⽀事务注册BranchRegisterRequest类的handle⽅法


2.6 TM/RM 事务提交
2.6.1 TM/RM 事务提交流程分析

2.6.2 主要流程源码分析
- DefaultCore的commit⽅法
```java
@Override
public GlobalStatus commit(String xid) throws TransactionException {
}//从数据库中查询全局事务会话 且设置分支事务与全局事务的关系GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {// the lock should release after branch commit// Highlight: Firstly, close the session, then no more branch can be registered.globalSession.closeAndClean();if (globalSession.getStatus() == GlobalStatus.Begin) {//修改全局事务会话状态为CommittingglobalSession.changeStatus(GlobalStatus.Committing);return true;}return false;});if (!shouldCommit) {return globalSession.getStatus();}//判断是异步提交还是同步提交 TCC和XA模式 需要同步if (globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {doGlobalCommit(globalSession, false);}return globalSession.getStatus();
```
DefaultCoordinator的init⽅法 线程池调用提交方法

handleAsyncCommitting⽅法
doGlobalCommit⽅法
RmBranchCommitProcessor⽅法


3 TCC源码
3.1 TCC源码分⽀事务注册流程分析

1. GlobalTransactionScanner的wrapIfNecessary⽅法
2. TccActionInterceptor的invoke⽅法

actionInterceptorHandler.proceed⽅法

3. TM在发起全局事务提交后,RM端会根据事务类型选择TCCResourceManager的branchCommit⽅ 法
