一、seata介绍

seata是一款开源的分布式事务解决方案,致力于在微服务建构下提供高性能和简单易用的分布式事务服务。
seata为用户提供了AT、TCC、SAGA和XA事务模式,为用户打造一站式的分布式解决方案。
本文只关注AT模式。

二、seata组成

seata设计上将整体分为三大模块,即TM、RM、TC,具体解释如下:
image.png

  • TM(Transaction Manager):全局事务管理器,控制全局事务边界,负责全局事务开启,全局提交,全局回滚;
  • RM(Resource Manager):资源管理器,控制分支事务,负责分支注册,状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚;
  • TC(Transaction Coordinator):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交和回滚。

注:比如此时有3个服务分别为订单服务、库存服务、积分服务,此时的业务流程是,
用户下订单—->减库存 —->增加积分。那么订单服务的角色是TM、RM,库存服务、积分服务的角色是RM,seata服务器是TC。

三、seata的AT模式

seata的AT模式是基于两阶段提交模式设计的,以高效且对业务零侵入的方式,解决微服务场景下的分布式问题。

1.AT模式的架构以及原理

seata的分布式事务是一个全局事务,由多个分支事务组成。seata的AT模式具体分为下面两步,

  • 分支(本地)事务执行,将一个本地事务做成一个分布式事务分支,所以许多分布在不同微服务中的本地事务组成了一个全局事务。结果如下:
    • image.png
  • 分支事务提交或回滚,当所有的分支事务全部完成并且都执行成功,这时,TM会发起全局事务提交,TC收到全局事务提交消息后,会通知各分支事务进行提交;同理,当全局事务中的所有分支事务全部完成并且其中有某个分支事务失败,TM会通知TC协调全局事务回滚,进而TC通知各分支事务回滚。

四、seata AT模式入门

1.启动seata server

可以使用docker模式拉取seata server镜像,然后修改相应的配置,然后启动

2.数据库结构

image.png

3.各微服务配置

  1. eureka:
  2. instance:
  3. hostname: 47.115.22.21
  4. prefer-ip-address: true
  5. client:
  6. serviceUrl:
  7. defaultZone: http://${eureka.instance.hostname}:8761/eureka/
  8. feign:
  9. hystrix:
  10. enabled: false
  11. client:
  12. config:
  13. default:
  14. connectTimeout: 5000
  15. readTimeout: 10000
  16. logging:
  17. level:
  18. io:
  19. seata: info
  20. mybatis:
  21. mapperLocations: classpath:mapper/*.xml
  22. typeAliasesPackage: io.seata.sample.entity
  23. server:
  24. port: 8180
  25. spring:
  26. application:
  27. name: order-server
  28. cloud:
  29. alibaba:
  30. seata:
  31. # 设置服务组
  32. tx-service-group: my_test_tx_group
  33. datasource:
  34. driver-class-name: com.mysql.jdbc.Driver
  35. password: xxx
  36. url: jdbc:mysql://47.115.22.21:3306/seata-order
  37. username: xxx

关键的两个文件file.conf,registry.conf
file.conf内容如下

  1. transport {
  2. # tcp udt unix-domain-socket
  3. type = "TCP"
  4. #NIO NATIVE
  5. server = "NIO"
  6. #enable heartbeat
  7. heartbeat = true
  8. # the client batch send request enable
  9. enableClientBatchSendRequest = true
  10. #thread factory for netty
  11. threadFactory {
  12. bossThreadPrefix = "NettyBoss"
  13. workerThreadPrefix = "NettyServerNIOWorker"
  14. serverExecutorThread-prefix = "NettyServerBizHandler"
  15. shareBossWorker = false
  16. clientSelectorThreadPrefix = "NettyClientSelector"
  17. clientSelectorThreadSize = 1
  18. clientWorkerThreadPrefix = "NettyClientWorkerThread"
  19. # netty boss thread size,will not be used for UDT
  20. bossThreadSize = 1
  21. #auto default pin or 8
  22. workerThreadSize = "default"
  23. }
  24. shutdown {
  25. # when destroy server, wait seconds
  26. wait = 3
  27. }
  28. serialization = "seata"
  29. compressor = "none"
  30. }
  31. service {
  32. #transaction service group mapping
  33. vgroupMapping.my_test_tx_group = "default"
  34. #only support when registry.type=file, please don't set multiple addresses
  35. default.grouplist = "47.115.22.21:8091"
  36. #degrade, current not support
  37. enableDegrade = false
  38. #disable seata
  39. disableGlobalTransaction = false
  40. }
  41. client {
  42. rm {
  43. asyncCommitBufferLimit = 10000
  44. lock {
  45. retryInterval = 10
  46. retryTimes = 30
  47. retryPolicyBranchRollbackOnConflict = true
  48. }
  49. reportRetryCount = 5
  50. tableMetaCheckEnable = false
  51. reportSuccessEnable = false
  52. }
  53. tm {
  54. commitRetryCount = 5
  55. rollbackRetryCount = 5
  56. }
  57. undo {
  58. dataValidation = true
  59. logSerialization = "jackson"
  60. logTable = "undo_log"
  61. }
  62. log {
  63. exceptionRate = 100
  64. }
  65. }
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://47.115.22.21:8761/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
    password = ""
    cluster = "default"
    timeout = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

4.增加全局事务注解

@Override
@GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)
public void create(Order order) {
    LOGGER.info("------->交易开始");
    //本地方法
    orderDao.create(order);

    //远程方法 扣减库存
    storageApi.decrease(order.getProductId(),order.getCount());

    //远程方法 扣减账户余额

    LOGGER.info("------->扣减账户开始order中");
    accountApi.decrease(order.getUserId(),order.getMoney());
    LOGGER.info("------->扣减账户结束order中");
    int i=10/0;

    LOGGER.info("------->交易结束");
}