数据库初始化工具

- 创建模块
添加依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jdbc</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
yml配置连接数据库
spring:datasource:url: jdbc:mysql:///?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8driver-class-name: com.mysql.cj.jdbc.Driverusername: rootpassword: root
创建sql脚本文件
在resource资源文件夹下创建sql目录,在sql目录中创建SQL脚本文件
- storage.sql
- seata-server.sql
- order.sql
- account.sql
- 在启动类中执行sql脚本
@PostConstruct注解作用:ScriptUtils.executeSqlScript()是Spring提供的 jdbc 脚本执行器@SpringBootApplicationpublic class DbInitApplication {@Autowiredprivate DataSource dataSource;public static void main(String[] args) {SpringApplication.run(DbInitApplication.class, args);}@PostConstructpublic void dbInit() throws Exception {exec("sql/account.sql");exec("sql/order.sql");exec("sql/seata-server.sql");exec("sql/storage.sql");}private void exec(String sql) throws Exception {ClassPathResource pathResource = new ClassPathResource(sql, DbInitApplication.class.getClassLoader());EncodedResource resource = new EncodedResource(pathResource, StandardCharsets.UTF_8);//spring jdbc 提供的一个工具用户执行sql脚本文件ScriptUtils.executeSqlScript(dataSource.getConnection(), resource);}}
Seata AT事务
部署seata server(TC事务协调器)
配置
- 修改registry.conf —向注册中心注册,将
type="file"改为type="eureka",注册至eureka的服务名改为自定义服务名 ```java registry {file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “eureka”
eureka { serviceUrl = “http://localhost:8761/eureka“ application = “seata-server” weight = “1” }
- file.conf -- 协调器运行过程中记录的日志数据,要存到数据库,设置`mode="db"`,使用数据库存储日志数据,设置mysql连接和用户名密码以及存储数据的表名```javamode = "db"publicKey = ""db {## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.datasource = "druid"## mysql/oracle/postgresql/h2/oceanbase etc.dbType = "mysql"driverClassName = "com.mysql.cj.jdbc.Driver"## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection paramurl = "jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8"user = "root"password = "root"minConn = 5maxConn = 30globalTable = "global_table"branchTable = "branch_table"lockTable = "lock_table"queryLimit = 100maxWait = 5000}
- seata-server.bat — 使用的内存默认 2G,测试环境把内存改小:256m,修改第85行数据
%JAVACMD% %JAVA_OPTS% -server -Xmx256m -Xms256m -Xmn128m -Xss512k -XX:SurvivorRatio=10 -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m -XX:MaxDirectMemorySize=1024m -XX:-OmitStackTraceInFastThrow -XX:-UseAdaptiveSizePolicy -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="%BASEDIR%"/logs/java_heapdump.hprof -XX:+DisableExplicitGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=75 -Xloggc:"%BASEDIR%"/logs/seata_gc.log -verbose:gc -Dio.netty.leakDetectionLevel=advanced -Dlogback.color.disable-for-bat=true -classpath %CLASSPATH% -Dapp.name="seata-server" -Dapp.repo="%REPO%" -Dapp.home="%BASEDIR%" -Dbasedir="%BASEDIR%" io.seata.server.Server %CMD_LINE_ARGS%
- 修改registry.conf —向注册中心注册,将
-
在无事务的项目中添加Seata AT事务
父项目中添加seata依赖
order中配置三个配置文件
- application.yml—配置事务组的组名 ```yaml spring: application: name: order
cloud: alibaba: seata:
tx-service-group: order_tx_group #表示当前模块在那个事务组中
```
- registry.conf—-注册中心地址
```yaml
registry {
file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “eureka”
nacos { serverAddr = “localhost” namespace = “” cluster = “default” } eureka {
连接eureka,从注册表中获取seata-server
serviceUrl = “http://localhost:8761/eureka“
application = “default”
weight = “1”
} redis { serverAddr = “localhost:6379” db = “0” password = “” cluster = “default” timeout = “0” } zk { cluster = “default” serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 username = “” password = “” } consul { cluster = “default” serverAddr = “127.0.0.1:8500” } etcd3 { cluster = “default” serverAddr = “http://localhost:2379“ } sofa { serverAddr = “127.0.0.1:9603” application = “default” region = “DEFAULT_ZONE” datacenter = “DefaultDataCenter” cluster = “default” group = “SEATA_GROUP” addressWaitTime = “3000” } file { name = “file.conf” } }
config {
file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = “file”
nacos { serverAddr = “localhost” namespace = “” group = “SEATA_GROUP” } consul { serverAddr = “127.0.0.1:8500” } apollo { app.id = “seata-server” apollo.meta = “http://192.168.1.204:8801“ namespace = “application” } zk { serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 username = “” password = “” } etcd3 { serverAddr = “http://localhost:2379“ } file { name = “file.conf” } }
- file.conf---配置事务组,使用的协调器```yamltransport {# tcp udt unix-domain-sockettype = "TCP"#NIO NATIVEserver = "NIO"#enable heartbeatheartbeat = true# the client batch send request enableenableClientBatchSendRequest = true#thread factory for nettythreadFactory {bossThreadPrefix = "NettyBoss"workerThreadPrefix = "NettyServerNIOWorker"serverExecutorThread-prefix = "NettyServerBizHandler"shareBossWorker = falseclientSelectorThreadPrefix = "NettyClientSelector"clientSelectorThreadSize = 1clientWorkerThreadPrefix = "NettyClientWorkerThread"# netty boss thread size,will not be used for UDTbossThreadSize = 1#auto default pin or 8workerThreadSize = "default"}shutdown {# when destroy server, wait secondswait = 3}serialization = "seata"compressor = "none"}service {#transaction service group mapping# order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致# “seata-server” 与 TC 服务器的注册名一致# 从eureka获取seata-server的地址,再向seata-server注册自己,设置groupvgroupMapping.order_tx_group = "seata-server"#only support when registry.type=file, please don't set multiple addressesorder_tx_group.grouplist = "127.0.0.1:8091"#degrade, current not supportenableDegrade = false#disable seatadisableGlobalTransaction = false}client {rm {asyncCommitBufferLimit = 10000lock {retryInterval = 10retryTimes = 30retryPolicyBranchRollbackOnConflict = true}reportRetryCount = 5tableMetaCheckEnable = falsereportSuccessEnable = false}tm {commitRetryCount = 5rollbackRetryCount = 5}undo {dataValidation = truelogSerialization = "jackson"logTable = "undo_log"}log {exceptionRate = 100}}
创建自动配置类,创建DatasourceProxy数据源代理对象
@Configurationpublic class DataSourceAutoConfiguration {/*** 新建原始数据源对象*/@Bean@ConfigurationProperties(prefix = "spring.datasource")public DataSource dataSource() {return new HikariDataSource();}/*** 新建数据源代理对象* Primary注解设置首选对象*/@Primary@Beanpublic DataSource dataSourceProxy(DataSource dataSource) {return new DataSourceProxy(dataSource);}}
- AT事务的自动事务处理代码,由数据源代理提供
- 禁用spring默认数据源配置,使用自定义的数据源配置(DataSourceAutoConfiguration)
禁用exclude = DataSourceAutoConfiguration.class
@EnableFeignClients@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)public class OrderApplication {public static void main(String[] args) {SpringApplication.run(OrderApplication.class, args);}}
在业务方法上添加事务注解
- spring本地事务添加
@Transactionel seata全局事务
@GlobalTractionel@Transactional@GlobalTransactional@Overridepublic void createOrder(Order order) {// TODO:远程调用发号器,生成订单idString id = easyIdClient.nextId("order_business");orderMapper.createOrder(order.setId(Long.valueOf(id)));// TODO:远程调用库存服务,减少库存//storageClient.decrease(order.getProductId(), order.getCount());// TODO:远程调用账户服务,减少账户金额//accountClient.decrease(order.getUserId(), order.getMoney());}
- spring本地事务添加
对库存(Storage)和账户(account)服务执行上述操作并访问:http://localhost:8083/create?userId=1&productId=1&count=10&money=100 进行测试
TCC事务
TCC是Try、Confirm、Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作 :预处理Try、确认Confirm、撤销Cancel。Try操作做业务检查及资源预留,Confirm做业务确认操作,Cancel实现一个与Try相反的操作既回滚操作。TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel操作若执行失败,TM会进行重试。
- TCC事务对代码侵入严重
每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。
- TCC 效率更高
不必对数据加全局锁,允许多个事务同时操作数据。 - 底层数据库表结构需要进行修改
AT事务适用于大部分业务场景,在一些复杂情况下,自动事务无法进行自动处理,需要手动处理事务
TCC事务两个阶段的三个操作
- 一阶段
- Try—预留资源,冻结数据
二阶段
新建工程导入无事务版本项目
- 添加tcc事务
- 添加seata依赖
修改三个配置文件
- application.yml——添加事务组 ```yaml spring: application: name: storage
datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_storage?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8 username: root password: root
cloud: alibaba:
seata:tx-service-group: order_tx_group
```
- file.conf——事务组对应协调器
- registry.conf——添加注册中心地址
- 修改Mapper,添加TCC数据库操作
dao接口
@Mapperpublic interface OrderMapper extends BaseMapper<Order> {/*** 新建订单** @param order 订单信息*/void createOrder(Order order);/*** 创建冻结订单** @param order 订单信息*/void createFrozen(Order order);/*** 修改订单状态** @param orderId 订单编号* @param status 订单状态*/void updateStatus(Long orderId, Integer status);//删除订单使用Mybatis-Plus}
mapper
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mapper namespace="cn.tedu.order.mapper.OrderMapper"><resultMap id="BaseResultMap" type="Order"><id column="id" property="id" jdbcType="BIGINT"/><result column="user_id" property="userId" jdbcType="BIGINT"/><result column="product_id" property="productId" jdbcType="BIGINT"/><result column="count" property="count" jdbcType="INTEGER"/><result column="money" property="money" jdbcType="DECIMAL"/><result column="status" property="status" jdbcType="INTEGER"/></resultMap><insert id="createOrder">INSERT INTO `order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`)VALUES (#{id}, #{userId}, #{productId}, #{count}, #{money}, 1);</insert><insert id="createFrozen">INSERT INTO `order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`)VALUES (#{id}, #{userId}, #{productId}, #{count}, #{money}, 0);</insert><update id="updateStatus">update `order`set status = #{status}where id = #{orderId}</update><delete id="deleteById">deletefrom `order`where id = #{orederId}</delete></mapper>
按照seata tcc的实现规则,定义TccAction接口和实现
添加三个方法,实现TCC三个操作
@LocalTCCpublic interface OrderTccAction {/*** 第一阶段的try* TwoPhaseBusinessAction注解用于定义一个TCC接口,添加在try方法上** @param businessActionContext 业务操作上下文对象,用于在两个阶段进行数据传递* @param id 订单编号* @param userId 用户id* @param productId 产品id* @param count 产品数量* @param money 金额* @return 是否成功*/@TwoPhaseBusinessAction(name = "OrderTccAction")boolean prepare(BusinessActionContext businessActionContext,@BusinessActionContextParameter(paramName = "orderId") Long id,Long userId,Long productId,Integer count,BigDecimal money);/*** 第二阶段的confirm** @param businessActionContext 业务操作上下文对象,用于在两个阶段进行数据传递* @return 是否成功*/boolean commit(BusinessActionContext businessActionContext);/*** 第二阶段的cancel** @param businessActionContext 业务操作上下文对象,用于在两个阶段进行数据传递* @return 是否成功*/boolean rollback(BusinessActionContext businessActionContext);}
三个方法添加
@Transactional注解控制本地事务@Componentpublic class OrderTccActionImpl implements OrderTccAction {private final OrderMapper orderMapper;@Autowiredpublic OrderTccActionImpl(OrderMapper orderMapper) {this.orderMapper = orderMapper;}@Transactional@Overridepublic boolean prepare(BusinessActionContext businessActionContext,Long id, Long userId,Long productId, Integer count, BigDecimal money) {orderMapper.createOrder(new Order(id, userId, productId, count, money, 0));return true;}@Transactional@Overridepublic boolean commit(BusinessActionContext businessActionContext) {Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());orderMapper.updateStatus(orderId, 1);return true;}@Transactional@Overridepublic boolean rollback(BusinessActionContext businessActionContext) {Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());orderMapper.deleteById(orderId);return true;}}
修改业务方法,调用TCCAction的第一阶段方法
- 第二阶段的方法有seata的RM组件自动调用
-
修改库存服务
添加配置文件
修改yml配置,添加事务组:
server:port: 8082spring:application:name: storagedatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/seata_storage?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8username: rootpassword: rootcloud:alibaba:seata:tx-service-group: order_tx_groupeureka:client:service-url:defaultZone: http://localhost:8761/eurekainstance:prefer-ip-address: trueinstance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}mybatis-plus:type-aliases-package: cn.tedu.storage.entitymapper-locations: classpath:/mapper/*.xmlconfiguration:map-underscore-to-camel-case: truelogging:level:cn.tedu.storage.mapper: DEBUG
在resource文件夹下添加seata配置文件
registry.conf:
registry {# file 、nacos 、eureka、redis、zk、consul、etcd3、sofatype = "eureka"nacos {serverAddr = "localhost"namespace = ""cluster = "default"}eureka {# 连接eureka,要从注册表发现 seata-serverserviceUrl = "http://localhost:8761/eureka"# application = "default"# weight = "1"}redis {serverAddr = "localhost:6379"db = "0"password = ""cluster = "default"timeout = "0"}zk {cluster = "default"serverAddr = "127.0.0.1:2181"session.timeout = 6000connect.timeout = 2000username = ""password = ""}consul {cluster = "default"serverAddr = "127.0.0.1:8500"}etcd3 {cluster = "default"serverAddr = "http://localhost:2379"}sofa {serverAddr = "127.0.0.1:9603"application = "default"region = "DEFAULT_ZONE"datacenter = "DefaultDataCenter"cluster = "default"group = "SEATA_GROUP"addressWaitTime = "3000"}file {name = "file.conf"}}config {# file、nacos 、apollo、zk、consul、etcd3、springCloudConfigtype = "file"nacos {serverAddr = "localhost"namespace = ""group = "SEATA_GROUP"}consul {serverAddr = "127.0.0.1:8500"}apollo {app.id = "seata-server"apollo.meta = "http://192.168.1.204:8801"namespace = "application"}zk {serverAddr = "127.0.0.1:2181"session.timeout = 6000connect.timeout = 2000username = ""password = ""}etcd3 {serverAddr = "http://localhost:2379"}file {name = "file.conf"}}
file.conf
transport {# tcp udt unix-domain-sockettype = "TCP"#NIO NATIVEserver = "NIO"#enable heartbeatheartbeat = true# the client batch send request enableenableClientBatchSendRequest = true#thread factory for nettythreadFactory {bossThreadPrefix = "NettyBoss"workerThreadPrefix = "NettyServerNIOWorker"serverExecutorThread-prefix = "NettyServerBizHandler"shareBossWorker = falseclientSelectorThreadPrefix = "NettyClientSelector"clientSelectorThreadSize = 1clientWorkerThreadPrefix = "NettyClientWorkerThread"# netty boss thread size,will not be used for UDTbossThreadSize = 1#auto default pin or 8workerThreadSize = "default"}shutdown {# when destroy server, wait secondswait = 3}serialization = "seata"compressor = "none"}service {#transaction service group mapping# order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致# “seata-server” 与 TC 服务器的注册名一致# 从eureka获取seata-server的地址,再向seata-server注册自己,设置group# order_tx_group 事务组,对应使用哪个协调器# seata-server 是注册表中的服务idvgroupMapping.order_tx_group = "seata-server"#only support when registry.type=file, please don't set multiple addressesorder_tx_group.grouplist = "127.0.0.1:8091"#degrade, current not supportenableDegrade = false#disable seatadisableGlobalTransaction = false}client {rm {asyncCommitBufferLimit = 10000lock {retryInterval = 10retryTimes = 30retryPolicyBranchRollbackOnConflict = true}reportRetryCount = 5tableMetaCheckEnable = falsereportSuccessEnable = false}tm {commitRetryCount = 5rollbackRetryCount = 5}undo {dataValidation = truelogSerialization = "jackson"logTable = "undo_log"}log {exceptionRate = 100}}
- 修改Mapper,添加TCC事务数据库操作
定义Mapper接口方法:
@Mapperpublic interface StorageMapper extends BaseMapper<Storage> {/*** 减少库存* @param productId 商品id* @param count 要减少的商品数量*/void decrease(Long productId, Integer count);/*** 查询库存,判断有无足够库存* @param productId 产品id* @return 库存信息*/Storage findByProductId(Long productId);/*** 冻结库存* @param productId 产品id* @param count 数量*/void updateResidueToFrozen(Long productId,Integer count);void updateFrozenToUsed(Long productId,Integer count);void updateFrozenToResidue(Long productId,Integer count);}
创建xml配置操作数据库:
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mapper namespace="cn.tedu.storage.mapper.StorageMapper"><resultMap id="BaseResultMap" type="Storage"><id column="id" property="id" jdbcType="BIGINT"/><result column="product_id" property="productId" jdbcType="BIGINT"/><result column="total" property="total" jdbcType="INTEGER"/><result column="used" property="used" jdbcType="INTEGER"/><result column="residue" property="residue" jdbcType="INTEGER"/></resultMap><update id="decrease">UPDATE storageSET used = used + #{count},residue = residue - #{count}WHERE product_id = #{productId}</update><select id="findByProductId" resultMap="BaseResultMap">select *from storagewhere id = #{productId};</select><update id="updateResidueToFrozen">update storageset residue = residue - #{count},frozen = frozen + #{count}where product_id = #{productId}</update><update id="updateFrozenToUsed">update storageset frozen = frozen - #{count},used = used + #{count}where product_id = #{productId}</update><update id="updateFrozenToResidue">update storageset frozen = frozen - #{count},residue = residue + #{count}where product_id = #{productId}</update></mapper>
- 定义TCC接口和实现
定义TCC接口:
@LocalTCCpublic interface StorageTccAction {@TwoPhaseBusinessAction(name = "StorageTccAction")boolean prepare(BusinessActionContext businessActionContext,@BusinessActionContextParameter(paramName = "productId") Long productId,@BusinessActionContextParameter(paramName = "count") Integer count);boolean commit(BusinessActionContext businessActionContext);boolean rollback(BusinessActionContext businessActionContext);}
定义TCC接口实现方法:
@Componentpublic class StorageTccActionImpl implements StorageTccAction {private final StorageMapper storageMapper;@Autowiredpublic StorageTccActionImpl(StorageMapper storageMapper) {this.storageMapper = storageMapper;}@Transactional@Overridepublic boolean prepare(BusinessActionContext businessActionContext, Long productId, Integer count) {Storage storage = storageMapper.findByProductId(productId);if (storage.getResidue() < count) {throw new RuntimeException("库存不足");}//冻结库存storageMapper.updateResidueToFrozen(productId, count);return true;}@Transactional@Overridepublic boolean commit(BusinessActionContext businessActionContext) {Long productId = Long.valueOf(businessActionContext.getActionContext("productId").toString());Integer count = Integer.valueOf(businessActionContext.getActionContext("count").toString());storageMapper.updateFrozenToUsed(productId, count);return true;}@Transactional@Overridepublic boolean rollback(BusinessActionContext businessActionContext) {Long productId = Long.valueOf(businessActionContext.getActionContext("productId").toString());Integer count = Integer.valueOf(businessActionContext.getActionContext("count").toString());storageMapper.updateFrozenToResidue(productId, count);return true;}}
调用TCC事务的Try方法
@Servicepublic class StorageImpl implements StorageService {private final StorageTccAction storageTccAction;@Autowiredpublic StorageImpl(StorageTccAction storageTccAction) {this.storageTccAction = storageTccAction;}@Overridepublic void decrease(Long productId, Integer count) {storageTccAction.prepare(null, productId, count);}}
