1 Seata 源码搭建

1.1 环境准备

JDK 1.8
Maven 3.2+

1.2 源码构建


从github上,下载seata的源码到本地,版本选择1.3.0 https://github.com/seata/seata/tree/1.3.0
image.png

1.3 导⼊idea⼯程

  1. 将lagou-parent⼯程放⼊seata源码中导⼊idea
    image.png
    2. idea⼯程⽬录
    image.png
    3. registry.conf ⽂件替换之前已搭建好的nacos注册中⼼和配置中⼼的⽂件 ```java registry {

    file 、nacos 、eureka、redis、zk、consul、etcd3、sofa

    type = “nacos” loadBalance = “RandomLoadBalance” loadBalanceVirtualNodes = 10

    nacos { application = “seata-server” serverAddr = “106.75.226.220:8848” group = “SEATA_GROUP” namespace = “” cluster = “default” username = “nacos” password = “nacos” } eureka { serviceUrl = “http://localhost:8761/eureka“ application = “default” weight = “1” } redis { serverAddr = “localhost:6379” db = 0 password = “” cluster = “default” timeout = 0 } zk { cluster = “default” serverAddr = “127.0.0.1:2181” sessionTimeout = 6000 connectTimeout = 2000 username = “” password = “” } consul { cluster = “default” serverAddr = “127.0.0.1:8500” } etcd3 { cluster = “default” serverAddr = “http://localhost:2379“ } sofa { serverAddr = “127.0.0.1:9603” application = “default” region = “DEFAULT_ZONE” datacenter = “DefaultDataCenter” cluster = “default” group = “SEATA_GROUP” addressWaitTime = “3000” } file { name = “file.conf” } }

config {

file、nacos 、apollo、zk、consul、etcd3

type = “nacos”

nacos { serverAddr = “106.75.226.220:8848” namespace = “” group = “SEATA_GROUP” username = “nacos” password = “nacos” } consul { serverAddr = “127.0.0.1:8500” } apollo { appId = “seata-server” apolloMeta = “http://192.168.1.204:8801“ namespace = “application” apolloAccesskeySecret = “” } zk { serverAddr = “127.0.0.1:2181” sessionTimeout = 6000 connectTimeout = 2000 username = “” password = “” } etcd3 { serverAddr = “http://localhost:2379“ } file { name = “file.conf” } }

  1. 4. 编译⼯程clean-->compile<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/12497888/1619166768454-0d72eaf2-b467-4950-a5a2-3ca3a7e97993.png#clientId=u3a7ce6bc-3a28-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=510&id=u60825a3d&margin=%5Bobject%20Object%5D&name=image.png&originHeight=510&originWidth=772&originalType=binary&ratio=1&rotation=0&showTitle=false&size=47319&status=done&style=none&taskId=uf0f655ba-d21b-4408-b654-4d62703a474&title=&width=772)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/12497888/1619166775369-759a60c9-ed15-45fc-b8ae-0c8184b105aa.png#clientId=u3a7ce6bc-3a28-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=607&id=u8613d5dc&margin=%5Bobject%20Object%5D&name=image.png&originHeight=607&originWidth=1648&originalType=binary&ratio=1&rotation=0&showTitle=false&size=104542&status=done&style=none&taskId=u6dc81e89-9a28-4229-9fb3-5271bb1b353&title=&width=1648)<br />查看nacos是否注册上<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/12497888/1619166787687-cd6da1e7-c822-43b8-be66-ab2500d65a2c.png#clientId=u3a7ce6bc-3a28-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=360&id=u8fb8155c&margin=%5Bobject%20Object%5D&name=image.png&originHeight=360&originWidth=1854&originalType=binary&ratio=1&rotation=0&showTitle=false&size=42899&status=done&style=none&taskId=u4cf80f56-7d7f-48b9-8cf7-ee9d7d56a7a&title=&width=1854)
  2. <a name="O98Q1"></a>
  3. ## 1.4 seata⼯程结构<br /><br />
  4. 先来看下整体seata 的⼯程结构<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/12497888/1619166863001-527fb5f8-68de-4992-ae2b-b8a53cbdb0f3.png#clientId=u3a7ce6bc-3a28-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=758&id=u60274927&margin=%5Bobject%20Object%5D&name=image.png&originHeight=758&originWidth=416&originalType=binary&ratio=1&rotation=0&showTitle=false&size=44534&status=done&style=none&taskId=u74934b33-0a74-4b00-b316-ca592ee072e&title=&width=416)
  5. - **seata-common 模块:** seata-common 项⽬,提供 Seata 封装的⼯具类、异常类等
  6. - **seata-core 模块:** seata-core 项⽬,提供 Seata 封装的 RPC、数据模型、通信消息格式等
  7. - **seata-config 模块**:从配置中⼼读取配置。
  8. - **seata-discovery模块:**⽤于 Seata TC 注册到注册中⼼。⽤于 Seata TM 从注册中⼼发现 Seata TC
  9. - **seata-rm模块:** seata-rm 项⽬,Seata RM 的核⼼实现
  10. - **seata-rm-datasource 模块:**seata-rm-datasource 项⽬,Seata 通过对 JDBC 拓展,从⽽实现对MySQL 等的透明接⼊ Seata RM 的实现
  11. - **seata-server 模块:**seata-server 项⽬,Seata TC 的核⼼实现,提供了事务协调、锁、事务状态、事务会话等功能
  12. - **seata-tm模块:**seata-tm 项⽬,Seata TM 的实现,提供了全局事务管理,例如说事务的发起,提交,回滚等
  13. - **seata-tcc 模块:**seata-tcc 项⽬,Seata TCC 事务模式的实现
  14. - **seata-spring 模块:** seata-spring 项⽬,Spring Seata 集成的实现。例如说,使⽤@GlobalTransactional 注解,⾃动创建全局事务,就是通过 seata-spring 项⽬来实现的。
  15. <a name="Yq8aw"></a>
  16. # 2 AT源码<br /><br />
  17. <a name="HaXaB"></a>
  18. ## 2.1 TC 启动流程
  19. ****TCserver启动执行main方法:**
  20. **首先设置日志存储模式**<br />**其次初始化一些线程池 定期检查服务注册 心跳 事务提交 回滚等操作**<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/12497888/1619320158720-8b60a86e-0465-4875-8c48-7b468e362b0c.png#clientId=u671ca88b-9f99-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=710&id=u73b43ee0&margin=%5Bobject%20Object%5D&name=image.png&originHeight=710&originWidth=884&originalType=binary&ratio=1&rotation=0&showTitle=false&size=126055&status=done&style=none&taskId=u98d710b9-41b3-43d0-a231-fa718d679c6&title=&width=884)<br />最后执行服务端初始化
  21. 1coordinator.init();
  22. ```java
  23. /**
  24. * Init.
  25. */
  26. public void init() {
  27. //重试回滚线程池 每秒钟执行一次
  28. retryRollbacking.scheduleAtFixedRate(() -> {
  29. try {
  30. handleRetryRollbacking();
  31. } catch (Exception e) {
  32. LOGGER.info("Exception retry rollbacking ... ", e);
  33. }
  34. }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
  35. //重试提交线程池 每秒钟执行一次
  36. retryCommitting.scheduleAtFixedRate(() -> {
  37. try {
  38. handleRetryCommitting();
  39. } catch (Exception e) {
  40. LOGGER.info("Exception retry committing ... ", e);
  41. }
  42. }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
  43. //每秒钟执行一次
  44. asyncCommitting.scheduleAtFixedRate(() -> {
  45. try {
  46. //异步事务提交
  47. handleAsyncCommitting();
  48. } catch (Exception e) {
  49. LOGGER.info("Exception async committing ... ", e);
  50. }
  51. }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
  52. timeoutCheck.scheduleAtFixedRate(() -> {
  53. try {
  54. timeoutCheck();
  55. } catch (Exception e) {
  56. LOGGER.info("Exception timeout checking ... ", e);
  57. }
  58. }, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
  59. //undoLogDelte删除
  60. undoLogDelete.scheduleAtFixedRate(() -> {
  61. try {
  62. undoLogDelete();
  63. } catch (Exception e) {
  64. LOGGER.info("Exception undoLog deleting ... ", e);
  65. }
  66. }, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
  67. }

查看一下 handleAsyncCommitting() 异步事务提交
image.png

  1. io.seata.core.rpc.netty.NettyRemotingServer#init()方法: 服务端初始化

image.png
registerProcessor方法:
image.png
super.init()
image.png

2.2 TM / RM 初始化与服务注册TC

2.2.1 流程分析

image.png

2.2.2 注册主要源码跟踪

  1. 导⼊seata的起步依赖后,会加载spring-cloud-alibaba-seata-2.1.0.RELEASE.jar下⾯的

spring.factories⽂件,在⽂件中加载GlobalTransactionAutoConfiguration类,针对seata的⾃动配置类
image.png
2. 声明GlobalTransactionScanner全局事务扫描Bean
image.png
3. GlobalTransactionScanner实现了spring bean⽣命周期接⼝InitializingBean,会执⾏afterPropertiesSet⽅法
image.png
image.png
4. initClient⽅法
image.png
image.png

  1. TMClient.init⽅法
    image.png
    getInstance⽅法获取实例
    image.png
    TmNettyRemotingClient构造⽅法

    1. public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
    2. ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
    3. super(messageExecutor);
    4. this.transactionRole = transactionRole;
    5. //创建Netty客户端引导类
    6. clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
    7. //设置通道处理程序
    8. clientBootstrap.setChannelHandlers(new ClientHandler());
    9. //创建Netty客户端通道管理器
    10. clientChannelManager = new NettyClientChannelManager(
    11. new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
    12. }

    ClientHandler客户端处理类

    1. /**
    2. * The type ClientHandler.
    3. */
    4. @Sharable
    5. class ClientHandler extends ChannelDuplexHandler {
    6. //读取通道消息
    7. @Override
    8. public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    9. if (!(msg instanceof RpcMessage)) {
    10. return;
    11. }
    12. //处理消息
    13. processMessage(ctx, (RpcMessage) msg);
    14. }
    15. ........
    16. }
  2. init()⽅法
    1. @Override
    2. public void init() {
    3. //创建一个延时线程 在60秒后执行
    4. timerExecutor.scheduleAtFixedRate(new Runnable() {
    5. @Override
    6. public void run() {
    7. //连接netty服务端
    8. clientChannelManager.reconnect(getTransactionServiceGroup());
    9. }
    10. }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    11. if (NettyClientConfig.isEnableClientBatchSendRequest()) {
    12. mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
    13. MAX_MERGE_SEND_THREAD,
    14. KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
    15. new LinkedBlockingQueue<>(),
    16. new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
    17. mergeSendExecutorService.submit(new MergedSendRunnable());
    18. }
    19. super.init();
    20. //启动客户端引导类
    21. clientBootstrap.start();
    22. }
  3. reconnect⽅法
    image.png

  4. seata-server 的 RegTmProcessor处理TM注册 ```java /*

    • Copyright 1999-2019 Seata.io Group. *
    • Licensed under the Apache License, Version 2.0 (the “License”);
    • you may not use this file except in compliance with the License.
    • You may obtain a copy of the License at *
    • http://www.apache.org/licenses/LICENSE-2.0 *
    • Unless required by applicable law or agreed to in writing, software
    • distributed under the License is distributed on an “AS IS” BASIS,
    • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    • See the License for the specific language governing permissions and
    • limitations under the License. */ package io.seata.core.rpc.processor.server;

import io.netty.channel.ChannelHandlerContext; import io.seata.common.loader.EnhancedServiceLoader; import io.seata.common.util.NetUtil; import io.seata.core.protocol.RegisterTMRequest; import io.seata.core.protocol.RegisterTMResponse; import io.seata.core.protocol.RpcMessage; import io.seata.core.protocol.Version; import io.seata.core.rpc.netty.ChannelManager; import io.seata.core.rpc.RemotingServer; import io.seata.core.rpc.RegisterCheckAuthHandler; import io.seata.core.rpc.processor.RemotingProcessor; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

/**

  • process TM client registry message.
  • process message type:
  • {@link RegisterTMRequest} *
  • @author zhangchenghui.dev@gmail.com
  • @since 1.3.0 */ public class RegTmProcessor implements RemotingProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(RegTmProcessor.class);

    private RemotingServer remotingServer;

    private RegisterCheckAuthHandler checkAuthHandler;

    public RegTmProcessor(RemotingServer remotingServer) {

    1. this.remotingServer = remotingServer;
    2. this.checkAuthHandler = EnhancedServiceLoader.load(RegisterCheckAuthHandler.class);

    }

    @Override public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {

    1. onRegTmMessage(ctx, rpcMessage);

    }

    private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {

    1. RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody();
    2. String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
    3. Version.putChannelVersion(ctx.channel(), message.getVersion());
    4. boolean isSuccess = false;
    5. String errorInfo = StringUtils.EMPTY;
    6. try {
    7. if (null == checkAuthHandler || checkAuthHandler.regTransactionManagerCheckAuth(message)) {
    8. ChannelManager.registerTMChannel(message, ctx.channel());
    9. Version.putChannelVersion(ctx.channel(), message.getVersion());
    10. isSuccess = true;
    11. if (LOGGER.isDebugEnabled()) {
    12. LOGGER.debug("checkAuth for client:{},vgroup:{},applicationId:{}",
    13. ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
    14. }
    15. }
    16. } catch (Exception exx) {
    17. isSuccess = false;
    18. errorInfo = exx.getMessage();
    19. LOGGER.error("TM register fail, error message:{}", errorInfo);
    20. }
    21. RegisterTMResponse response = new RegisterTMResponse(isSuccess);
    22. if (StringUtils.isNotEmpty(errorInfo)) {
    23. response.setMsg(errorInfo);
    24. }
    25. remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
    26. if (LOGGER.isInfoEnabled()) {
    27. LOGGER.info("TM register success,message:{},channel:{},client version:{}", message, ctx.channel(),
    28. message.getVersion());
    29. }

    }

}

  1. <a name="L4UWv"></a>
  2. ## 2.4 TM开启全局事务<br /><br />
  3. <a name="GDKVx"></a>
  4. ### 2.4.1 TM开启全局事务流程分析
  5. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/12497888/1619321942693-0f9180f0-914c-44ac-a978-3dc2eae4408b.png#clientId=u671ca88b-9f99-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=1289&id=u7514882c&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1289&originWidth=1781&originalType=binary&ratio=1&rotation=0&showTitle=false&size=121126&status=done&style=none&taskId=u9c23afe0-de92-4a41-8362-8231dfec6d6&title=&width=1781)
  6. <a name="nekaH"></a>
  7. ### 2.3.1 主要源码跟踪<br /><br />
  8. 1. wrapIfNecessary⽅法
  9. ```java
  10. //容器创建bean的时候 调用此方法检查是否有全局事务
  11. @Override
  12. protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
  13. //判断是否有配置全局事务开启
  14. if (disableGlobalTransaction) {
  15. return bean;
  16. }
  17. try {
  18. synchronized (PROXYED_SET) {
  19. if (PROXYED_SET.contains(beanName)) {
  20. return bean;
  21. }
  22. interceptor = null;
  23. //检查是否为TCC代理
  24. if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
  25. //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
  26. interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
  27. } else {
  28. Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
  29. Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
  30. //检查是否有GlobalTransactional注解
  31. if (!existsAnnotation(new Class[]{serviceInterface})
  32. && !existsAnnotation(interfacesIfJdk)) {
  33. return bean;
  34. }
  35. if (interceptor == null) {
  36. if (globalTransactionalInterceptor == null) {
  37. //创建全局事务拦截器
  38. globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
  39. ConfigurationCache.addConfigListener(
  40. ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
  41. (ConfigurationChangeListener)globalTransactionalInterceptor);
  42. }
  43. interceptor = globalTransactionalInterceptor;
  44. }
  45. }
  46. LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
  47. if (!AopUtils.isAopProxy(bean)) {
  48. bean = super.wrapIfNecessary(bean, beanName, cacheKey);
  49. } else {
  50. AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
  51. Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
  52. for (Advisor avr : advisor) {
  53. advised.addAdvisor(0, avr);
  54. }
  55. }
  56. PROXYED_SET.add(beanName);
  57. return bean;
  58. }
  59. } catch (Exception exx) {
  60. throw new RuntimeException(exx);
  61. }
  62. }
  1. GlobalTransactionalInterceptor的invoke⽅法 ```java @Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    1. Class<?> targetClass =
    2. methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
    3. Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    4. if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
    5. final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
    6. //获取方法上GlobalTransactional注解信息
    7. final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);
    8. final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
    9. boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
    10. if (!localDisable) {
    11. //判断注解是否为空
    12. if (globalTransactionalAnnotation != null) {
    13. //处理全局事务
    14. return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    15. } else if (globalLockAnnotation != null) {
    16. return handleGlobalLock(methodInvocation);
    17. }
    18. }
    19. }
    20. return methodInvocation.proceed();
    }
  1. 3. handleGlobalTransactionexecute⽅法
  2. ```java
  3. /**
  4. * Execute object.
  5. *
  6. * @param business the business
  7. * @return the object
  8. * @throws TransactionalExecutor.ExecutionException the execution exception
  9. */
  10. public Object execute(TransactionalExecutor business) throws Throwable {
  11. // 1 get transactionInfo
  12. //获取事务信息
  13. TransactionInfo txInfo = business.getTransactionInfo();
  14. if (txInfo == null) {
  15. throw new ShouldNeverHappenException("transactionInfo does not exist");
  16. }
  17. // 1.1 获取或创建一个全局事务
  18. GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
  19. // 1.2 处理事务传播
  20. Propagation propagation = txInfo.getPropagation();
  21. SuspendedResourcesHolder suspendedResourcesHolder = null;
  22. try {
  23. switch (propagation) {
  24. case NOT_SUPPORTED:
  25. suspendedResourcesHolder = tx.suspend(true);
  26. return business.execute();
  27. case REQUIRES_NEW:
  28. suspendedResourcesHolder = tx.suspend(true);
  29. break;
  30. case SUPPORTS:
  31. if (!existingTransaction()) {
  32. return business.execute();
  33. }
  34. break;
  35. case REQUIRED:
  36. break;
  37. case NEVER:
  38. if (existingTransaction()) {
  39. throw new TransactionException(
  40. String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
  41. ,RootContext.getXID()));
  42. } else {
  43. return business.execute();
  44. }
  45. case MANDATORY:
  46. if (!existingTransaction()) {
  47. throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
  48. }
  49. break;
  50. default:
  51. throw new TransactionException("Not Supported Propagation:" + propagation);
  52. }
  53. try {
  54. // 2. 开启事务
  55. beginTransaction(txInfo, tx);
  56. Object rs = null;
  57. try {
  58. //执行目标方法
  59. rs = business.execute();
  60. } catch (Throwable ex) {
  61. // 3.回滚业务异常所需
  62. completeTransactionAfterThrowing(txInfo, tx, ex);
  63. throw ex;
  64. }
  65. // 4. everything is fine, commit.
  66. commitTransaction(tx);
  67. return rs;
  68. } finally {
  69. //5. clear
  70. triggerAfterCompletion();
  71. cleanUp();
  72. }
  73. } finally {
  74. tx.resume(suspendedResourcesHolder);
  75. }
  76. }
  1. beginTransaction⽅法最终会发送seata server 也就是TC
    image.png
    =>io.seata.tm.DefaultTransactionManager#begin TM事务管理器开启事务
    1. @Override
    2. public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    3. throws TransactionException {
    4. //创建全局开启事务请求
    5. GlobalBeginRequest request = new GlobalBeginRequest();
    6. request.setTransactionName(name);
    7. request.setTimeout(timeout);
    8. //同步调用开启事务
    9. GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
    10. if (response.getResultCode() == ResultCode.Failed) {
    11. throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    12. }
    13. return response.getXid();
    14. }
  2. seata-server 的ServerOnRequestProcessor处理请求
    =》io.seata.core.rpc.processor.server.ServerOnRequestProcessor#onRequestMessage
    image.png
    6. onRequest的⽅法最终DefaultCoordinator的doGlobalBegin⽅法
    image.png
    core.begin开启

    1. @Override
    2. public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    3. throws TransactionException {
    4. //创建全局会话
    5. GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
    6. timeout);
    7. //添加监听器
    8. session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    9. //开启事务会话
    10. session.begin();
    11. // transaction start event
    12. eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
    13. session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
    14. //返回全局事务XId
    15. return session.getXid();
    16. }

    session.begin()—>DataBaseSessionManager的addGlobalSession
    image.png

writeSession⽅法
image.png

2.5 RM分⽀事务注册

2.5.1 RM分⽀事务注册流程分析

image.png

2.5.2 全局事务xid传输分析

注册流程中分支事务通过全局事务Xid来判断当前数据源执行是否是全局事务,Xid由TC生成传给TM,TM和RM为不同的模块,那么TM在feign调用远程微服务(RM)时,RM如何获取Xid来判断是否有全局事务?
在上小节TM开启全局事务时,同步发消息给TC由TC生成全局事务Xid返回给TM
io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)
image.png
将全局事务xid绑定到seata上下文对象中。
TM调用方式:
image.png
通过追踪可以知道orderServiceFeign 使用的是SeataFeignClient 进行远程调用
image.png
seataFeign自动配置类:
image.png

image.png
image.png
=》warp(bean) 判断为feignClient时 将bean封装成SeataFeign 也就是说将FeignClient对象bean 替换成了SeataFeignClient
image.png
=>SeataFeignClient#execute()发送feign远程调用

image.png
=> getModifyRequest()
image.png
由此 xid传递到下一个RM中

2.5.3 RM分支事务注册源码分析

  1. 代理数据源创建
    image.png
    ConnectionProxy代理连接对象创建
    image.png
    PreparedStatementProxy代理执⾏对象创建
    image.png

  2. 当发起数据库操作的时候最终会调⽤PreparedStatementProxy.execute⽅法
    image.png
    ExecuteTemplate.execute()
    image.png

  3. executor.execute⽅法
    image.png

  4. doExecute—>executeAutoCommitTrue(args)
    image.png

  5. executeAutoCommitFalse(args)⽅法
    image.png
    6. connectionProxy.commit();提交事务⽅法
    image.png

  6. server端处理分⽀事务注册BranchRegisterRequest类的handle⽅法
    image.png

image.png

2.6 TM/RM 事务提交

2.6.1 TM/RM 事务提交流程分析

image.png

2.6.2 主要流程源码分析

  1. DefaultCore的commit⽅法 ```java @Override public GlobalStatus commit(String xid) throws TransactionException {
    1. //从数据库中查询全局事务会话 且设置分支事务与全局事务的关系
    2. GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    3. if (globalSession == null) {
    4. return GlobalStatus.Finished;
    5. }
    6. globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    7. // just lock changeStatus
    8. boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
    9. // the lock should release after branch commit
    10. // Highlight: Firstly, close the session, then no more branch can be registered.
    11. globalSession.closeAndClean();
    12. if (globalSession.getStatus() == GlobalStatus.Begin) {
    13. //修改全局事务会话状态为Committing
    14. globalSession.changeStatus(GlobalStatus.Committing);
    15. return true;
    16. }
    17. return false;
    18. });
    19. if (!shouldCommit) {
    20. return globalSession.getStatus();
    21. }
    22. //判断是异步提交还是同步提交 TCC和XA模式 需要同步
    23. if (globalSession.canBeCommittedAsync()) {
    24. globalSession.asyncCommit();
    25. return GlobalStatus.Committed;
    26. } else {
    27. doGlobalCommit(globalSession, false);
    28. }
    29. return globalSession.getStatus();
    }

```

  1. DefaultCoordinator的init⽅法 线程池调用提交方法
    image.png
    handleAsyncCommitting⽅法
    image.png
    doGlobalCommit⽅法
    image.png

  2. RmBranchCommitProcessor⽅法
    image.png
    image.png

3 TCC源码

3.1 TCC源码分⽀事务注册流程分析

image.png
1. GlobalTransactionScanner的wrapIfNecessary⽅法
image.png
2. TccActionInterceptor的invoke⽅法
image.png
image.png
actionInterceptorHandler.proceed⽅法
image.png
image.png
3. TM在发起全局事务提交后,RM端会根据事务类型选择TCCResourceManager的branchCommit⽅ 法
image.png