在微服务环境中其实不推荐使用分布式事务,因为他降低性能
但是,有些场景以数据安全为主,性能就是其次的,比如涉及到钱的操作,这个时候就需要使用分布式事务
安装
以Seata 1.1.0版本为例
自1.0.0版本起conf目录下不提供sql文件了,可以从0.9.0版本里面copy过来
安装包直接在github上面下载就行了 Releases · seata/seata (github.com)
配置
registry.conf
配置seata要注册到服务中心,这里以eureka为例
可以给seata定义一个服务名
如果使用eureka配置,registry里其他的配置项可以删除,config也同理
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
nacos {
serverAddr = "localhost:8848"
namespace = "public"
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "SEATA-SERVER"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
# 这里制定file为配置
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
制定file为配置,file.conf就需要改了
file.conf
主要作用是定义事务分组
如果以数据库存储数据,就要配置数据库的链接
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = false
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThreadPrefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
# service configuration, only used in client side
service {
#transaction service group mapping
#vgroupMapping.后面定义的是事务分组
vgroupMapping.fsp_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
#client transaction configuration, only used in client side
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
sqlParserType = druid
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
## transaction log store, only used in server side
store {
## store mode: file、db
#以数据库来存储
mode = "db"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "root"
password = "1234"
minConn = 1
maxConn = 10
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
}
}
## server configuration, only used in server side
server {
recovery {
#schedule committing retry period in milliseconds
committingRetryPeriod = 1000
#schedule asyn committing retry period in milliseconds
asynCommittingRetryPeriod = 1000
#schedule rollbacking retry period in milliseconds
rollbackingRetryPeriod = 1000
#schedule timeout retry period in milliseconds
timeoutRetryPeriod = 1000
}
undo {
logSaveDays = 7
#schedule delete expired undo_log in milliseconds
logDeletePeriod = 86400000
}
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
maxCommitRetryTimeout = "-1"
maxRollbackRetryTimeout = "-1"
rollbackRetryTimeoutUnlockEnable = false
}
## metrics configuration, only used in server side
metrics {
enabled = false
registryType = "compact"
# multi exporters use comma divided
exporterList = "prometheus"
exporterPrometheusPort = 9898
}
如果选择以数据库存储数据,就要在制定的jdbc:mysql://127.0.0.1:3306/seata
数据库中添加三张表
-- the table to store GlobalSession data
drop table if exists `global_table`;
create table `global_table` (
`xid` varchar(128) not null,
`transaction_id` bigint,
`status` tinyint not null,
`application_id` varchar(32),
`transaction_service_group` varchar(32),
`transaction_name` varchar(128),
`timeout` int,
`begin_time` bigint,
`application_data` varchar(2000),
`gmt_create` datetime,
`gmt_modified` datetime,
primary key (`xid`),
key `idx_gmt_modified_status` (`gmt_modified`, `status`),
key `idx_transaction_id` (`transaction_id`)
);
-- the table to store BranchSession data
drop table if exists `branch_table`;
create table `branch_table` (
`branch_id` bigint not null,
`xid` varchar(128) not null,
`transaction_id` bigint ,
`resource_group_id` varchar(32),
`resource_id` varchar(256) ,
`lock_key` varchar(128) ,
`branch_type` varchar(8) ,
`status` tinyint,
`client_id` varchar(64),
`application_data` varchar(2000),
`gmt_create` datetime,
`gmt_modified` datetime,
primary key (`branch_id`),
key `idx_xid` (`xid`)
);
-- the table to store lock data
drop table if exists `lock_table`;
create table `lock_table` (
`row_key` varchar(128) not null,
`xid` varchar(96),
`transaction_id` long ,
`branch_id` long,
`resource_id` varchar(256) ,
`table_name` varchar(32) ,
`pk` varchar(36) ,
`gmt_create` datetime ,
`gmt_modified` datetime,
primary key(`row_key`)
);
服务启动
seata/bin目录下,一个用于windows环境一个用于linux
以linux为例,使用后台启动命令并输出运行日志,指定端口 ip 运行模式,以file方式运行,不依赖数据库
nohup sh seata-server.sh -p 8091 -h 127.0.0.1 -m file > seata.log 2>&1 &
整合SpringBoot编写Demo
Demo创两个工程一个叫order 一个叫pay
Maven依赖
<properties>
<java.version>1.8</java.version>
<alibaba.seata.version>2.2.0.RELEASE</alibaba.seata.version>
<!-- ${seata.version}版本号与seata服务版本一致 -->
<seata.version>1.1.0</seata.version>
</properties>
<!-- 集成seata分布式事务依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>${alibaba.seata.version}</version>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 集成seata -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>${seata.version}</version>
<exclusions>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
</exclusions>
</dependency>
application.yml配置文件
以order为例
spring:
application:
name: order
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/order?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai
username: root
password: 1234
cloud:
alibaba:
seata:
# 定义事务组的名称
tx-service-group: fsp_tx_group
# eureka注册的相关配置
eureka:
instance:
hostname: localhost
port: 8761
prefer-ip-address: true
instance-id: ${spring.cloud.client.ip-address}:${server.port}
lease-renewal-interval-in-seconds: 5
lease-expiration-duration-in-seconds: 10
client:
registry-fetch-interval-seconds: 5
service-url:
defaultZone: http://${eureka.instance.hostname}:${eureka.instance.port}/eureka/
# seata相关配置
seata:
address: 127.0.0.1:8091
enabled: true
enableAutoDataSourceProxy: false
# 事务分组与上面对应
tx-service-group: fsp_tx_group
registry:
type: eureka
eureka:
application: hatech-seata-server
weight: 1
service-url: ${eureka.client.service-url.defaultZone}
transport:
type: TCP
server: NIO
heartbeat: true
thread-factory:
boss-thread-prefix: NettyBoss
worker-thread-prefix: NettyServerNIOWorker
server-executor-thread-prefix: NettyServerBizHandler
share-boss-worker: false
client-selector-thread-prefix: NettyClientSelector
client-selector-thread-size: 1
client-worker-thread-prefix: NettyClientWorkerThread
boss-thread-size: 1
worker-thread-size: 8
shutdown:
wait: 3
serialization: seata
compressor: none
service:
# 事务群组与
vgroup-mapping:
fsp_tx_group: seata-server
# TC服务列表
grouplist:
hatech-seata-server: ${seata.address}
enable-degrade: false
disable-global-transaction: false
client:
rm:
async-commit-buffer-limit: 10000
table-meta-check-enable: false
lock:
retry-times: 30
retry-interval: 10
report-retry-count: 5
undo:
# 用于回滚的表
log-table: undo_log
data-validation: true
创建undo_log表
由于配置了用户回滚的表undo_log
,所以需要在每个工程连接的数据库里都要创建undo_log表,表名可以随意取,但要与配置文件中的对应
-- the table to store seata xid data
drop table `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
配置代理数据源
Seata的二阶段执行是通过拦截sql语句,分析语义来指定回滚策略,因此需要对DataSource做代理,每一个微服务项目都要配置
如果使用JdbcTemplate的配置
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(new DataSourceProxy(dataSource));
}
如果使用MybatisPlus的配置
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSource) throws Exception {
// 订单服务中引入了mybatis-plus,所以要使用特殊的SqlSessionFactoryBean
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
// 代理数据源
sqlSessionFactoryBean.setDataSource(new DataSourceProxy(dataSource));
// 生成SqlSessionFactory
return sqlSessionFactoryBean.getObject();
}
如果用的是原生的mybatis,请使用SqlSessionFactoryBean
事务注解的使用
使用起来非常简单在方法上加个@GlobalTransactional
注解就行了
@GetMapping("/save")
@GlobalTransactional
public String save() {
//订单
this.orderService.save();
//支付
String forObject = this.restTemplate.getForObject("http://localhost:9000/save", String.class);
int i = 10 / 0;
return "success";
}
版本升级1.4.0
下载地址 https://github.com/seata/seata/releases/download/v1.4.0/seata-server-1.4.0.tar.gz
配置file.conf文件
以文件的方式存储为例
主要修改store段及file段以下属性:
mode = “file” maxBranchSessionSize = 1120000 fileWriteBufferCacheSize = 1120000
[maxBranchSessionSize]: maxBranchSessionSize计算公式(单位为字节):【id字节数(32)】x【一次mapper方法可能操作的最大数据条数(5000)】x【一次业务逻辑最大可能操作业务的表的数量(7)】=1120000,如果一个事务中的主键总大小超过这个值(默认为16kb,会出现异常:Failed to store branch(这也就是前面所述的1.0.0版本中存在的问题)
[fileWriteBufferCacheSize]:建议maxBranchSessionSize与fileWriteBufferCacheSize相同
完整示例如下:
## transaction log store, only used in seata-server
store {
## store mode: file、db、redis
mode = "file"
## file store property
## maxBranchSessionSize计算公式(单位为字节):【id字节数(32)】*【一次mapper方法可能操作的最大数据条数(5000)】*【一次业务逻辑最大可能操作业务的表的数量(7)】=1120000
## 建议maxBranchSessionSize与fileWriteBufferCacheSize相同
## 如果字节数超过1120000字节,会出现异常:Failed to store branch, 建议内存为1024MB,以此增加
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 1120000
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 1120000
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
}
修改registry.conf配置文件
以eureka为注册中心,主要是registry的配置,config的配置用file就行了
注意注册服务名的修改
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
loadBalance = "RandomLoadBalance"
loadBalanceVirtualNodes = 10
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "seata-server"
weight = "1"
}
}
springboot整合
maven依赖
<!-- 集成seata分布式事务依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 集成seata为最新版本 -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.0</version>
<exclusions>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
yml配置
# seata 配置
seata:
address: 10.27.3.140:8091
enabled: true
application-id: ${spring.application.name}
tx-service-group: istorm_tx_group
client:
tm:
default-global-transaction-timeout: 60000
undo:
log-table: seata_log
service:
vgroup-mapping:
//与tx-service-group的值对应,然后与注册到注册中心的服务名对应
istorm_tx_group: seata-server
grouplist:
//与上面配置的 seata-server
seata-server: ${seata.address}
registry:
type: eureka
eureka:
application: ${seata.service.vgroup-mapping.fsp_tx_group}
service-url: ${eureka.client.service-url.defaultZone}