1 Seata 源码搭建
1.1 环境准备
JDK 1.8
Maven 3.2+
1.2 源码构建
从github上,下载seata的源码到本地,版本选择1.3.0 https://github.com/seata/seata/tree/1.3.0
1.3 导⼊idea⼯程
将lagou-parent⼯程放⼊seata源码中导⼊idea
2. idea⼯程⽬录
3. registry.conf ⽂件替换之前已搭建好的nacos注册中⼼和配置中⼼的⽂件 ```java registry {file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “nacos” loadBalance = “RandomLoadBalance” loadBalanceVirtualNodes = 10
nacos { application = “seata-server” serverAddr = “106.75.226.220:8848” group = “SEATA_GROUP” namespace = “” cluster = “default” username = “nacos” password = “nacos” } eureka { serviceUrl = “http://localhost:8761/eureka“ application = “default” weight = “1” } redis { serverAddr = “localhost:6379” db = 0 password = “” cluster = “default” timeout = 0 } zk { cluster = “default” serverAddr = “127.0.0.1:2181” sessionTimeout = 6000 connectTimeout = 2000 username = “” password = “” } consul { cluster = “default” serverAddr = “127.0.0.1:8500” } etcd3 { cluster = “default” serverAddr = “http://localhost:2379“ } sofa { serverAddr = “127.0.0.1:9603” application = “default” region = “DEFAULT_ZONE” datacenter = “DefaultDataCenter” cluster = “default” group = “SEATA_GROUP” addressWaitTime = “3000” } file { name = “file.conf” } }
config {
file、nacos 、apollo、zk、consul、etcd3
type = “nacos”
nacos { serverAddr = “106.75.226.220:8848” namespace = “” group = “SEATA_GROUP” username = “nacos” password = “nacos” } consul { serverAddr = “127.0.0.1:8500” } apollo { appId = “seata-server” apolloMeta = “http://192.168.1.204:8801“ namespace = “application” apolloAccesskeySecret = “” } zk { serverAddr = “127.0.0.1:2181” sessionTimeout = 6000 connectTimeout = 2000 username = “” password = “” } etcd3 { serverAddr = “http://localhost:2379“ } file { name = “file.conf” } }
4. 编译⼯程clean-->compile<br /><br /><br />查看nacos是否注册上<br />
<a name="O98Q1"></a>
## 1.4 seata⼯程结构<br /><br />
先来看下整体seata 的⼯程结构<br />
- **seata-common 模块:** seata-common 项⽬,提供 Seata 封装的⼯具类、异常类等
- **seata-core 模块:** seata-core 项⽬,提供 Seata 封装的 RPC、数据模型、通信消息格式等
- **seata-config 模块**:从配置中⼼读取配置。
- **seata-discovery模块:**⽤于 Seata TC 注册到注册中⼼。⽤于 Seata TM 从注册中⼼发现 Seata TC。
- **seata-rm模块:** seata-rm 项⽬,Seata 对 RM 的核⼼实现
- **seata-rm-datasource 模块:**seata-rm-datasource 项⽬,Seata 通过对 JDBC 拓展,从⽽实现对MySQL 等的透明接⼊ Seata RM 的实现
- **seata-server 模块:**seata-server 项⽬,Seata 对 TC 的核⼼实现,提供了事务协调、锁、事务状态、事务会话等功能
- **seata-tm模块:**seata-tm 项⽬,Seata 对 TM 的实现,提供了全局事务管理,例如说事务的发起,提交,回滚等
- **seata-tcc 模块:**seata-tcc 项⽬,Seata 对 TCC 事务模式的实现
- **seata-spring 模块:** seata-spring 项⽬,Spring 对 Seata 集成的实现。例如说,使⽤@GlobalTransactional 注解,⾃动创建全局事务,就是通过 seata-spring 项⽬来实现的。
<a name="Yq8aw"></a>
# 2 AT源码<br /><br />
<a name="HaXaB"></a>
## 2.1 TC 启动流程
****TCserver启动执行main方法:**
**首先设置日志存储模式**<br />**其次初始化一些线程池 定期检查服务注册 心跳 事务提交 回滚等操作**<br /><br />最后执行服务端初始化
1、coordinator.init();
```java
/**
* Init.
*/
public void init() {
//重试回滚线程池 每秒钟执行一次
retryRollbacking.scheduleAtFixedRate(() -> {
try {
handleRetryRollbacking();
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//重试提交线程池 每秒钟执行一次
retryCommitting.scheduleAtFixedRate(() -> {
try {
handleRetryCommitting();
} catch (Exception e) {
LOGGER.info("Exception retry committing ... ", e);
}
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//每秒钟执行一次
asyncCommitting.scheduleAtFixedRate(() -> {
try {
//异步事务提交
handleAsyncCommitting();
} catch (Exception e) {
LOGGER.info("Exception async committing ... ", e);
}
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
timeoutCheck.scheduleAtFixedRate(() -> {
try {
timeoutCheck();
} catch (Exception e) {
LOGGER.info("Exception timeout checking ... ", e);
}
}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//undoLogDelte删除
undoLogDelete.scheduleAtFixedRate(() -> {
try {
undoLogDelete();
} catch (Exception e) {
LOGGER.info("Exception undoLog deleting ... ", e);
}
}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}
查看一下 handleAsyncCommitting() 异步事务提交
- io.seata.core.rpc.netty.NettyRemotingServer#init()方法: 服务端初始化
registerProcessor方法:
super.init()
2.2 TM / RM 初始化与服务注册TC
2.2.1 流程分析
2.2.2 注册主要源码跟踪
- 导⼊seata的起步依赖后,会加载spring-cloud-alibaba-seata-2.1.0.RELEASE.jar下⾯的
spring.factories⽂件,在⽂件中加载GlobalTransactionAutoConfiguration类,针对seata的⾃动配置类
2. 声明GlobalTransactionScanner全局事务扫描Bean
3. GlobalTransactionScanner实现了spring bean⽣命周期接⼝InitializingBean,会执⾏afterPropertiesSet⽅法
4. initClient⽅法
TMClient.init⽅法
getInstance⽅法获取实例
TmNettyRemotingClient构造⽅法public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
super(messageExecutor);
this.transactionRole = transactionRole;
//创建Netty客户端引导类
clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
//设置通道处理程序
clientBootstrap.setChannelHandlers(new ClientHandler());
//创建Netty客户端通道管理器
clientChannelManager = new NettyClientChannelManager(
new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
}
ClientHandler客户端处理类
/**
* The type ClientHandler.
*/
@Sharable
class ClientHandler extends ChannelDuplexHandler {
//读取通道消息
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
}
//处理消息
processMessage(ctx, (RpcMessage) msg);
}
........
}
- init()⽅法
@Override
public void init() {
//创建一个延时线程 在60秒后执行
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//连接netty服务端
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
//启动客户端引导类
clientBootstrap.start();
}
reconnect⽅法
seata-server 的 RegTmProcessor处理TM注册 ```java /*
- Copyright 1999-2019 Seata.io Group. *
- Licensed under the Apache License, Version 2.0 (the “License”);
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at *
- http://www.apache.org/licenses/LICENSE-2.0 *
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an “AS IS” BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. */ package io.seata.core.rpc.processor.server;
import io.netty.channel.ChannelHandlerContext; import io.seata.common.loader.EnhancedServiceLoader; import io.seata.common.util.NetUtil; import io.seata.core.protocol.RegisterTMRequest; import io.seata.core.protocol.RegisterTMResponse; import io.seata.core.protocol.RpcMessage; import io.seata.core.protocol.Version; import io.seata.core.rpc.netty.ChannelManager; import io.seata.core.rpc.RemotingServer; import io.seata.core.rpc.RegisterCheckAuthHandler; import io.seata.core.rpc.processor.RemotingProcessor; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
/**
- process TM client registry message.
- process message type:
- {@link RegisterTMRequest} *
- @author zhangchenghui.dev@gmail.com
@since 1.3.0 */ public class RegTmProcessor implements RemotingProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RegTmProcessor.class);
private RemotingServer remotingServer;
private RegisterCheckAuthHandler checkAuthHandler;
public RegTmProcessor(RemotingServer remotingServer) {
this.remotingServer = remotingServer;
this.checkAuthHandler = EnhancedServiceLoader.load(RegisterCheckAuthHandler.class);
}
@Override public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
onRegTmMessage(ctx, rpcMessage);
}
private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody();
String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
Version.putChannelVersion(ctx.channel(), message.getVersion());
boolean isSuccess = false;
String errorInfo = StringUtils.EMPTY;
try {
if (null == checkAuthHandler || checkAuthHandler.regTransactionManagerCheckAuth(message)) {
ChannelManager.registerTMChannel(message, ctx.channel());
Version.putChannelVersion(ctx.channel(), message.getVersion());
isSuccess = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("checkAuth for client:{},vgroup:{},applicationId:{}",
ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
}
}
} catch (Exception exx) {
isSuccess = false;
errorInfo = exx.getMessage();
LOGGER.error("TM register fail, error message:{}", errorInfo);
}
RegisterTMResponse response = new RegisterTMResponse(isSuccess);
if (StringUtils.isNotEmpty(errorInfo)) {
response.setMsg(errorInfo);
}
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("TM register success,message:{},channel:{},client version:{}", message, ctx.channel(),
message.getVersion());
}
}
}
<a name="L4UWv"></a>
## 2.4 TM开启全局事务<br /><br />
<a name="GDKVx"></a>
### 2.4.1 TM开启全局事务流程分析

<a name="nekaH"></a>
### 2.3.1 主要源码跟踪<br /><br />
1. wrapIfNecessary⽅法
```java
//容器创建bean的时候 调用此方法检查是否有全局事务
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
//判断是否有配置全局事务开启
if (disableGlobalTransaction) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//检查是否为TCC代理
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//检查是否有GlobalTransactional注解
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (interceptor == null) {
if (globalTransactionalInterceptor == null) {
//创建全局事务拦截器
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
- GlobalTransactionalInterceptor的invoke⽅法
```java
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
}Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//获取方法上GlobalTransactional注解信息
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
//判断注解是否为空
if (globalTransactionalAnnotation != null) {
//处理全局事务
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
}
}
}
return methodInvocation.proceed();
3. handleGlobalTransaction的execute⽅法
```java
/**
* Execute object.
*
* @param business the business
* @return the object
* @throws TransactionalExecutor.ExecutionException the execution exception
*/
public Object execute(TransactionalExecutor business) throws Throwable {
// 1 get transactionInfo
//获取事务信息
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 获取或创建一个全局事务
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.2 处理事务传播
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
suspendedResourcesHolder = tx.suspend(true);
return business.execute();
case REQUIRES_NEW:
suspendedResourcesHolder = tx.suspend(true);
break;
case SUPPORTS:
if (!existingTransaction()) {
return business.execute();
}
break;
case REQUIRED:
break;
case NEVER:
if (existingTransaction()) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
,RootContext.getXID()));
} else {
return business.execute();
}
case MANDATORY:
if (!existingTransaction()) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
try {
// 2. 开启事务
beginTransaction(txInfo, tx);
Object rs = null;
try {
//执行目标方法
rs = business.execute();
} catch (Throwable ex) {
// 3.回滚业务异常所需
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
} finally {
tx.resume(suspendedResourcesHolder);
}
}
- beginTransaction⽅法最终会发送seata server 也就是TC
=>io.seata.tm.DefaultTransactionManager#begin TM事务管理器开启事务@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
//创建全局开启事务请求
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
//同步调用开启事务
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
seata-server 的ServerOnRequestProcessor处理请求
=》io.seata.core.rpc.processor.server.ServerOnRequestProcessor#onRequestMessage
6. onRequest的⽅法最终DefaultCoordinator的doGlobalBegin⽅法
core.begin开启@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
//创建全局会话
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
//添加监听器
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//开启事务会话
session.begin();
// transaction start event
eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
//返回全局事务XId
return session.getXid();
}
session.begin()—>DataBaseSessionManager的addGlobalSession
writeSession⽅法
2.5 RM分⽀事务注册
2.5.1 RM分⽀事务注册流程分析
2.5.2 全局事务xid传输分析
注册流程中分支事务通过全局事务Xid来判断当前数据源执行是否是全局事务,Xid由TC生成传给TM,TM和RM为不同的模块,那么TM在feign调用远程微服务(RM)时,RM如何获取Xid来判断是否有全局事务?
在上小节TM开启全局事务时,同步发消息给TC由TC生成全局事务Xid返回给TM
io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)
将全局事务xid绑定到seata上下文对象中。
TM调用方式:
通过追踪可以知道orderServiceFeign 使用的是SeataFeignClient 进行远程调用
seataFeign自动配置类:
=》warp(bean) 判断为feignClient时 将bean封装成SeataFeign 也就是说将FeignClient对象bean 替换成了SeataFeignClient
=>SeataFeignClient#execute()发送feign远程调用
=> getModifyRequest()
由此 xid传递到下一个RM中
2.5.3 RM分支事务注册源码分析
代理数据源创建
ConnectionProxy代理连接对象创建
PreparedStatementProxy代理执⾏对象创建当发起数据库操作的时候最终会调⽤PreparedStatementProxy.execute⽅法
ExecuteTemplate.execute()executor.execute⽅法
doExecute—>executeAutoCommitTrue(args)
executeAutoCommitFalse(args)⽅法
6. connectionProxy.commit();提交事务⽅法server端处理分⽀事务注册BranchRegisterRequest类的handle⽅法
2.6 TM/RM 事务提交
2.6.1 TM/RM 事务提交流程分析
2.6.2 主要流程源码分析
- DefaultCore的commit⽅法
```java
@Override
public GlobalStatus commit(String xid) throws TransactionException {
}//从数据库中查询全局事务会话 且设置分支事务与全局事务的关系
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
// the lock should release after branch commit
// Highlight: Firstly, close the session, then no more branch can be registered.
globalSession.closeAndClean();
if (globalSession.getStatus() == GlobalStatus.Begin) {
//修改全局事务会话状态为Committing
globalSession.changeStatus(GlobalStatus.Committing);
return true;
}
return false;
});
if (!shouldCommit) {
return globalSession.getStatus();
}
//判断是异步提交还是同步提交 TCC和XA模式 需要同步
if (globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
doGlobalCommit(globalSession, false);
}
return globalSession.getStatus();
```
DefaultCoordinator的init⽅法 线程池调用提交方法
handleAsyncCommitting⽅法
doGlobalCommit⽅法RmBranchCommitProcessor⽅法
3 TCC源码
3.1 TCC源码分⽀事务注册流程分析
1. GlobalTransactionScanner的wrapIfNecessary⽅法
2. TccActionInterceptor的invoke⽅法
actionInterceptorHandler.proceed⽅法
3. TM在发起全局事务提交后,RM端会根据事务类型选择TCCResourceManager的branchCommit⽅ 法