数据库初始化工具
- 创建模块
添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
yml配置连接数据库
spring:
datasource:
url: jdbc:mysql:///?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: root
创建sql脚本文件
在resource资源文件夹下创建sql目录,在sql目录中创建SQL脚本文件
- storage.sql
- seata-server.sql
- order.sql
- account.sql
- 在启动类中执行sql脚本
@PostConstruct
注解作用:ScriptUtils.executeSqlScript()
是Spring提供的 jdbc 脚本执行器@SpringBootApplication
public class DbInitApplication {
@Autowired
private DataSource dataSource;
public static void main(String[] args) {
SpringApplication.run(DbInitApplication.class, args);
}
@PostConstruct
public void dbInit() throws Exception {
exec("sql/account.sql");
exec("sql/order.sql");
exec("sql/seata-server.sql");
exec("sql/storage.sql");
}
private void exec(String sql) throws Exception {
ClassPathResource pathResource = new ClassPathResource(sql, DbInitApplication.class.getClassLoader());
EncodedResource resource = new EncodedResource(pathResource, StandardCharsets.UTF_8);
//spring jdbc 提供的一个工具用户执行sql脚本文件
ScriptUtils.executeSqlScript(dataSource.getConnection(), resource);
}
}
Seata AT事务
部署seata server(TC事务协调器)
配置
- 修改registry.conf —向注册中心注册,将
type="file"
改为type="eureka"
,注册至eureka的服务名改为自定义服务名 ```java registry {file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “eureka”
eureka { serviceUrl = “http://localhost:8761/eureka“ application = “seata-server” weight = “1” }
- file.conf -- 协调器运行过程中记录的日志数据,要存到数据库,设置`mode="db"`,使用数据库存储日志数据,设置mysql连接和用户名密码以及存储数据的表名
```java
mode = "db"
publicKey = ""
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
datasource = "druid"
## mysql/oracle/postgresql/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.cj.jdbc.Driver"
## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
url = "jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8"
user = "root"
password = "root"
minConn = 5
maxConn = 30
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
maxWait = 5000
}
- seata-server.bat — 使用的内存默认 2G,测试环境把内存改小:256m,修改第85行数据
%JAVACMD% %JAVA_OPTS% -server -Xmx256m -Xms256m -Xmn128m -Xss512k -XX:SurvivorRatio=10 -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m -XX:MaxDirectMemorySize=1024m -XX:-OmitStackTraceInFastThrow -XX:-UseAdaptiveSizePolicy -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="%BASEDIR%"/logs/java_heapdump.hprof -XX:+DisableExplicitGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=75 -Xloggc:"%BASEDIR%"/logs/seata_gc.log -verbose:gc -Dio.netty.leakDetectionLevel=advanced -Dlogback.color.disable-for-bat=true -classpath %CLASSPATH% -Dapp.name="seata-server" -Dapp.repo="%REPO%" -Dapp.home="%BASEDIR%" -Dbasedir="%BASEDIR%" io.seata.server.Server %CMD_LINE_ARGS%
- 修改registry.conf —向注册中心注册,将
-
在无事务的项目中添加Seata AT事务
父项目中添加seata依赖
order中配置三个配置文件
- application.yml—配置事务组的组名 ```yaml spring: application: name: order
cloud: alibaba: seata:
tx-service-group: order_tx_group #表示当前模块在那个事务组中
```
- registry.conf—-注册中心地址
```yaml
registry {
file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “eureka”
nacos { serverAddr = “localhost” namespace = “” cluster = “default” } eureka {
连接eureka,从注册表中获取seata-server
serviceUrl = “http://localhost:8761/eureka“
application = “default”
weight = “1”
} redis { serverAddr = “localhost:6379” db = “0” password = “” cluster = “default” timeout = “0” } zk { cluster = “default” serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 username = “” password = “” } consul { cluster = “default” serverAddr = “127.0.0.1:8500” } etcd3 { cluster = “default” serverAddr = “http://localhost:2379“ } sofa { serverAddr = “127.0.0.1:9603” application = “default” region = “DEFAULT_ZONE” datacenter = “DefaultDataCenter” cluster = “default” group = “SEATA_GROUP” addressWaitTime = “3000” } file { name = “file.conf” } }
config {
file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = “file”
nacos { serverAddr = “localhost” namespace = “” group = “SEATA_GROUP” } consul { serverAddr = “127.0.0.1:8500” } apollo { app.id = “seata-server” apollo.meta = “http://192.168.1.204:8801“ namespace = “application” } zk { serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 username = “” password = “” } etcd3 { serverAddr = “http://localhost:2379“ } file { name = “file.conf” } }
- file.conf---配置事务组,使用的协调器
```yaml
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
# order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
# “seata-server” 与 TC 服务器的注册名一致
# 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
vgroupMapping.order_tx_group = "seata-server"
#only support when registry.type=file, please don't set multiple addresses
order_tx_group.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
创建自动配置类,创建DatasourceProxy数据源代理对象
@Configuration
public class DataSourceAutoConfiguration {
/**
* 新建原始数据源对象
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource() {
return new HikariDataSource();
}
/**
* 新建数据源代理对象
* Primary注解设置首选对象
*/
@Primary
@Bean
public DataSource dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
}
- AT事务的自动事务处理代码,由数据源代理提供
- 禁用spring默认数据源配置,使用自定义的数据源配置(DataSourceAutoConfiguration)
禁用exclude = DataSourceAutoConfiguration.class
@EnableFeignClients
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
在业务方法上添加事务注解
- spring本地事务添加
@Transactionel
seata全局事务
@GlobalTractionel
@Transactional
@GlobalTransactional
@Override
public void createOrder(Order order) {
// TODO:远程调用发号器,生成订单id
String id = easyIdClient.nextId("order_business");
orderMapper.createOrder(order.setId(Long.valueOf(id)));
// TODO:远程调用库存服务,减少库存
//storageClient.decrease(order.getProductId(), order.getCount());
// TODO:远程调用账户服务,减少账户金额
//accountClient.decrease(order.getUserId(), order.getMoney());
}
- spring本地事务添加
对库存(Storage)和账户(account)服务执行上述操作并访问:http://localhost:8083/create?userId=1&productId=1&count=10&money=100 进行测试
TCC事务
TCC是Try、Confirm、Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作 :预处理Try、确认Confirm、撤销Cancel。Try操作做业务检查及资源预留,Confirm做业务确认操作,Cancel实现一个与Try相反的操作既回滚操作。TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel操作若执行失败,TM会进行重试。
- TCC事务对代码侵入严重
每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。
- TCC 效率更高
不必对数据加全局锁,允许多个事务同时操作数据。 - 底层数据库表结构需要进行修改
AT事务适用于大部分业务场景,在一些复杂情况下,自动事务无法进行自动处理,需要手动处理事务
TCC事务两个阶段的三个操作
- 一阶段
- Try—预留资源,冻结数据
二阶段
新建工程导入无事务版本项目
- 添加tcc事务
- 添加seata依赖
修改三个配置文件
- application.yml——添加事务组 ```yaml spring: application: name: storage
datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_storage?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8 username: root password: root
cloud: alibaba:
seata:
tx-service-group: order_tx_group
```
- file.conf——事务组对应协调器
- registry.conf——添加注册中心地址
- 修改Mapper,添加TCC数据库操作
dao接口
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
/**
* 新建订单
*
* @param order 订单信息
*/
void createOrder(Order order);
/**
* 创建冻结订单
*
* @param order 订单信息
*/
void createFrozen(Order order);
/**
* 修改订单状态
*
* @param orderId 订单编号
* @param status 订单状态
*/
void updateStatus(Long orderId, Integer status);
//删除订单使用Mybatis-Plus
}
mapper
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.tedu.order.mapper.OrderMapper">
<resultMap id="BaseResultMap" type="Order">
<id column="id" property="id" jdbcType="BIGINT"/>
<result column="user_id" property="userId" jdbcType="BIGINT"/>
<result column="product_id" property="productId" jdbcType="BIGINT"/>
<result column="count" property="count" jdbcType="INTEGER"/>
<result column="money" property="money" jdbcType="DECIMAL"/>
<result column="status" property="status" jdbcType="INTEGER"/>
</resultMap>
<insert id="createOrder">
INSERT INTO `order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`)
VALUES (#{id}, #{userId}, #{productId}, #{count}, #{money}, 1);
</insert>
<insert id="createFrozen">
INSERT INTO `order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`)
VALUES (#{id}, #{userId}, #{productId}, #{count}, #{money}, 0);
</insert>
<update id="updateStatus">
update `order`
set status = #{status}
where id = #{orderId}
</update>
<delete id="deleteById">
delete
from `order`
where id = #{orederId}
</delete>
</mapper>
按照seata tcc的实现规则,定义TccAction接口和实现
添加三个方法,实现TCC三个操作
@LocalTCC
public interface OrderTccAction {
/**
* 第一阶段的try
* TwoPhaseBusinessAction注解用于定义一个TCC接口,添加在try方法上
*
* @param businessActionContext 业务操作上下文对象,用于在两个阶段进行数据传递
* @param id 订单编号
* @param userId 用户id
* @param productId 产品id
* @param count 产品数量
* @param money 金额
* @return 是否成功
*/
@TwoPhaseBusinessAction(name = "OrderTccAction")
boolean prepare(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "orderId") Long id,
Long userId,
Long productId,
Integer count,
BigDecimal money);
/**
* 第二阶段的confirm
*
* @param businessActionContext 业务操作上下文对象,用于在两个阶段进行数据传递
* @return 是否成功
*/
boolean commit(BusinessActionContext businessActionContext);
/**
* 第二阶段的cancel
*
* @param businessActionContext 业务操作上下文对象,用于在两个阶段进行数据传递
* @return 是否成功
*/
boolean rollback(BusinessActionContext businessActionContext);
}
三个方法添加
@Transactional
注解控制本地事务@Component
public class OrderTccActionImpl implements OrderTccAction {
private final OrderMapper orderMapper;
@Autowired
public OrderTccActionImpl(OrderMapper orderMapper) {
this.orderMapper = orderMapper;
}
@Transactional
@Override
public boolean prepare(BusinessActionContext businessActionContext,
Long id, Long userId,
Long productId, Integer count, BigDecimal money) {
orderMapper.createOrder(new Order(id, userId, productId, count, money, 0));
return true;
}
@Transactional
@Override
public boolean commit(BusinessActionContext businessActionContext) {
Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());
orderMapper.updateStatus(orderId, 1);
return true;
}
@Transactional
@Override
public boolean rollback(BusinessActionContext businessActionContext) {
Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());
orderMapper.deleteById(orderId);
return true;
}
}
修改业务方法,调用TCCAction的第一阶段方法
- 第二阶段的方法有seata的RM组件自动调用
-
修改库存服务
添加配置文件
修改yml配置,添加事务组:
server:
port: 8082
spring:
application:
name: storage
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_storage?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: root
password: root
cloud:
alibaba:
seata:
tx-service-group: order_tx_group
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka
instance:
prefer-ip-address: true
instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
mybatis-plus:
type-aliases-package: cn.tedu.storage.entity
mapper-locations: classpath:/mapper/*.xml
configuration:
map-underscore-to-camel-case: true
logging:
level:
cn.tedu.storage.mapper: DEBUG
在resource文件夹下添加seata配置文件
registry.conf:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
# 连接eureka,要从注册表发现 seata-server
serviceUrl = "http://localhost:8761/eureka"
# application = "default"
# weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
password = ""
cluster = "default"
timeout = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
file.conf
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
# order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
# “seata-server” 与 TC 服务器的注册名一致
# 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
# order_tx_group 事务组,对应使用哪个协调器
# seata-server 是注册表中的服务id
vgroupMapping.order_tx_group = "seata-server"
#only support when registry.type=file, please don't set multiple addresses
order_tx_group.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
- 修改Mapper,添加TCC事务数据库操作
定义Mapper接口方法:
@Mapper
public interface StorageMapper extends BaseMapper<Storage> {
/**
* 减少库存
* @param productId 商品id
* @param count 要减少的商品数量
*/
void decrease(Long productId, Integer count);
/**
* 查询库存,判断有无足够库存
* @param productId 产品id
* @return 库存信息
*/
Storage findByProductId(Long productId);
/**
* 冻结库存
* @param productId 产品id
* @param count 数量
*/
void updateResidueToFrozen(Long productId,Integer count);
void updateFrozenToUsed(Long productId,Integer count);
void updateFrozenToResidue(Long productId,Integer count);
}
创建xml配置操作数据库:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.tedu.storage.mapper.StorageMapper">
<resultMap id="BaseResultMap" type="Storage">
<id column="id" property="id" jdbcType="BIGINT"/>
<result column="product_id" property="productId" jdbcType="BIGINT"/>
<result column="total" property="total" jdbcType="INTEGER"/>
<result column="used" property="used" jdbcType="INTEGER"/>
<result column="residue" property="residue" jdbcType="INTEGER"/>
</resultMap>
<update id="decrease">
UPDATE storage
SET used = used + #{count},
residue = residue - #{count}
WHERE product_id = #{productId}
</update>
<select id="findByProductId" resultMap="BaseResultMap">
select *
from storage
where id = #{productId};
</select>
<update id="updateResidueToFrozen">
update storage
set residue = residue - #{count},
frozen = frozen + #{count}
where product_id = #{productId}
</update>
<update id="updateFrozenToUsed">
update storage
set frozen = frozen - #{count},
used = used + #{count}
where product_id = #{productId}
</update>
<update id="updateFrozenToResidue">
update storage
set frozen = frozen - #{count},
residue = residue + #{count}
where product_id = #{productId}
</update>
</mapper>
- 定义TCC接口和实现
定义TCC接口:
@LocalTCC
public interface StorageTccAction {
@TwoPhaseBusinessAction(name = "StorageTccAction")
boolean prepare(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "count") Integer count);
boolean commit(BusinessActionContext businessActionContext);
boolean rollback(BusinessActionContext businessActionContext);
}
定义TCC接口实现方法:
@Component
public class StorageTccActionImpl implements StorageTccAction {
private final StorageMapper storageMapper;
@Autowired
public StorageTccActionImpl(StorageMapper storageMapper) {
this.storageMapper = storageMapper;
}
@Transactional
@Override
public boolean prepare(BusinessActionContext businessActionContext, Long productId, Integer count) {
Storage storage = storageMapper.findByProductId(productId);
if (storage.getResidue() < count) {
throw new RuntimeException("库存不足");
}
//冻结库存
storageMapper.updateResidueToFrozen(productId, count);
return true;
}
@Transactional
@Override
public boolean commit(BusinessActionContext businessActionContext) {
Long productId = Long.valueOf(businessActionContext.getActionContext("productId").toString());
Integer count = Integer.valueOf(businessActionContext.getActionContext("count").toString());
storageMapper.updateFrozenToUsed(productId, count);
return true;
}
@Transactional
@Override
public boolean rollback(BusinessActionContext businessActionContext) {
Long productId = Long.valueOf(businessActionContext.getActionContext("productId").toString());
Integer count = Integer.valueOf(businessActionContext.getActionContext("count").toString());
storageMapper.updateFrozenToResidue(productId, count);
return true;
}
}
调用TCC事务的Try方法
@Service
public class StorageImpl implements StorageService {
private final StorageTccAction storageTccAction;
@Autowired
public StorageImpl(StorageTccAction storageTccAction) {
this.storageTccAction = storageTccAction;
}
@Override
public void decrease(Long productId, Integer count) {
storageTccAction.prepare(null, productId, count);
}
}