基本概念

比如有两台服务器,就先跑一下,如果没有问题才真正提交执行。但是假如服务1提交后服务2挂了(人工补偿,定时任务,脚本补偿)

理解CAP

CAP是Consistency、Availability、Parition tolerance三个词语的缩写,分别表示一致性、可用性、分区容忍性。
下边我们分别来解释 :
为了方便对CAP理论的理解,我们结合电商系统中的一些业务场景来理解CAP
如下图,是商品信息管理的执行流程 :
分布式事务 - 图1

整体执行流程如下 :
1、商品服务请求主数据库写入商品信息(添加商品、修改商品、删除商品)。
2、主数据库向商品服务响应写入成功。
3、商品服务请求从数据库读取商品信息。
C-Consistency
一致性是指写操作后的读操作可以读取到最新的数据状态,当数据分布在多个节点上,从任意节点读取到的数据都是最新的状态。
上图中,商品信息的读写要满足一致性就是要实现如下目标 :
1、商品服务写入主数据库成功,则向从数据库查询新数据也成功。
2、商品服务写入主数据库失败,则向从数据库查询新数据也失败。
如何实现一致性?
1、写入主数据库后要将数据同步到从数据库。
2、写入主数据库后,在向从数据库同步期间要将从数据库锁定,待同步完成后再释放锁,以免在新数据写入成功后,向从数据库查询到旧的数据。
分布式系统一致性的特点 :
1、由于存在数据同步的过程,写操作的响应会有一定的延迟。
2、为了保证数据一致性会对资源暂时锁定,待数据同步完成释放锁定资源。
3、如果请求数据同步失败的节点则会返回错误信息,一定不会返回旧数据。
A-Availability
可用性是指任何事务操作都可以得到响应结果,且不会出现响应超时或响应错误。
上图中,商品信息读取满足可用性就是要实现如下目标 :
1、从数据库接收到数据查询的请求则立即能够响应数据查询结果。
2、从数据库不允许出现响应超时或响应错误。
如何实现可用性?
1、写入主数据库后要将数据同步到从数据库。
2、由于要保证从数据库的可用性,不可将从数据库中的资源进行锁定。
3、即使数据还没有同步过来,从数据库也要返回要查询的数据,哪怕是旧数据,如果连旧数据也没有则可以按照约定返回一个默认信息,但不能返回错误或者响应超时。
分布式系统可用性的特点 :
1、所有请求都有响应,且不会出现响应超时或响应错误。
P-Partition tolerance
通常分布式系统的各个节点部署在不同的子网,这就是网络分区,不可避免的会出现由于网络问题而导致节点之间通讯失败,此时仍可对外提供服务,这叫分区容忍性。
上图中,商品信息读写满足分区容忍性就是要实现如下目标 :
1、主数据库向从数据库同步数据失败不影响读写操作。
2、其一个节点挂掉不影响另一个节点对外提供服务。
如何实现分区容忍性?
1、尽量使用异步取代同步操作,例如使用异步方式将数据从主数据库同步到从数据,这样节点之间能有效的实现松耦合。
2、添加从数据库节点,其中一个从节点挂掉其它从节点提供服务。
分布式分区容忍性的特点 :
1、分区容忍性是分布式系统具备的基本能力。

总结

通过上面的学习,CAP是一个已经被证实的理论 :一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容忍性(Partition tolerance)这三项中的两项。它可以作为我们架构设计、技术选型的考量标准。对于多数大型互联网应用的场景,节点众多、部署分散,而且现在的集群规模越来越大,所以节点故障、网络故障是常态,而且要保证服务可用性达到N个9(99.99.%),并要达到良好的响应性能来提高用户体验,因此一般都会做出如下选择 :保证P和A,舍弃C强一致性,保证最终一致性。

BASE理论

理解强一致性和最终一致性

CAP理论告诉我们一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容忍性(Partition tolerance)这三项中的两项,其中AP在实际应用中较多,AP既舍弃一致性,保证可用性和分区容忍性,但是在实际生产中很多场景都要实现一致性,比如前边我们举的例子,AP即舍弃一致性,保证可用性和分区容忍性,但是在实际产生中很多场景都要实现一致性,比如前边我们觉得例子主数据库向从数据库同步数据,即使不要一致性,但是最终也要将数据同步成功来保证数据一致,这种一致性和CAP中的一致性不同,CAP中的一致性要求在任何时间查询每个节点数据都必须一致,它强调的是强一致性,但是最终一致性是允许可以在一段时间内每个节点的数据不一致,但是经过一段时间每个节点的数据必须一致,它强调的是最终数据的一致性。

Base理论介绍

BASE是Basically Availbale(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)三个短语的缩写。BASE理论是对CAP中AP的一个扩展,通过牺牲强一致性来获得可用性,当出现故障允许部分不可用但要保证核心功能可用,允许数据在一段时间内是不一致的,但最终达到一致状态。满足BASE理论的事务,我们称之为“柔性事务”。

  • 基本可用 :分布式系统在出现故障时,允许损失部分可用功能,保证核心功能可用。如电商网址交易付款出现问题来,商品依然可以正常浏览。
  • 软状态:由于不要求强一致性,所以BASE允许系统中存在中间状态(也叫软状态),这个状态不影响系统可用性,如订单中的“支付中”、“数据同步中”等状态,待数据最终一致后状态改为“成功”状态。
  • 最终一致性:最终一致是指的经过一段时间后,所有节点数据都将会达到一致。如订单的“支付中”状态,最终会变为“支付成功”或者“支付失败”,使订单状态与实际交易结果达成一致,但需要一定时间的延迟、等待。

解决方案

XA、2PC

分布式事务解决方案之2PC(两阶段提交)

针对不同的分布式场景业界常见的解决方案有2PC、TCC、可靠消息最终一致性、最大努力通知这几种。

什么是2PC

2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2是指两阶段,P是指准备阶段,C是提交阶段。
举例 :张三和李四好久不见,老友约起聚餐,饭店老板要求先买单,才能出票。这时张三和李四分别抱怨近况不如意,囊肿羞涩,都不愿意请客,这时只能AA。只有张三和李四都付款,老板才能出票安排就餐。但由于张三和李四都是铁公鸡,形成两尴尬的一幕 :
准备阶段 :老板要求张三付款,张三付款。老板要求李四付款,李四付款。
提交阶段 :老板出票,两人拿票纷纷落座就餐。
例子中形成两一个事务,若张三或李四其中一个拒绝付款,或钱不够,店老板都不会给出票,并且会把已收款退回。
整个事务过程由事务管理器和参与者组成,店老板就是事务管理器,张三、李四就是事务参与者,事务管理器负责决策整个分布式事务的提交和回滚,事务参与者负责自己本地事务的提交和回滚。
在计算机中部分关系数据库如Oracle、MySQL支持两阶段提交协议,如下图 :
1. 准备阶段(Prepare phase):事务管理器给每个参与者发送Prepare消息,每个数据库参与者在本地执行事务,并写本地的Undo/Redo日志,此时事务没有提交。
(Undo日志是记录修改前的数据,用于数据库回滚,Redo日志是记录修改后的数据,用于提交事务后写入数据文件)
2. 提交阶段(commit phase):如果事务管理器收到两参与者的执行失败或者超时消息时,直接给每个参与者发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据事务管理器的指令执行提交或者回滚操作,并释放事务处理过程中使用的锁资源。注意 :必须在最后阶段释放锁资源。
下图展示两2PC的两个阶段,分成功和失败两个情况说明 :
成功情况 :
分布式事务 - 图2
失败情况 :
分布式事务 - 图3

2PC与3PC的区别

相对于2PC,3PC主要解决的单点故障问题,并减少阻塞,因为一旦参与者无法及时收到来自协调者的信息之后,他会默认执行commit。而不会一直持有事务资源并处于阻塞状态。但是这种机制也会导致数据一致性问题,因为,由于网络原因,协调者发送的abort响应没有及时被参与者接收到,那么参与者在等待超时之后执行了commit操作。这样就和其他接到abort命令并执行回滚的参与者之间存在数据不一致的情况。

解决方案

XA方案

2PC的传统方案是在数据库层面实现的,如Oracle、MySQL都支持2PC协议,为了统一标准减少行业内不必要的对接成本,需要制定标准化的处理模型及接口标准,国际开放标准组织Open Group定义分布式事务处理模型DTP(Distributed Transaction Processing Reference Model)。
为了让大家更明确XA方案的内容,下面新用户注册送积分为例来说明 :
分布式事务 - 图4
执行流程如下 :
1、应用程序(AP)持有用户库和积分库两个数据源。
2、应用程序(AP)通过TM通知用户库RM新增用户,同时通知积分库RM为该用户新增积分,RM此时并未提交事务,此时用户和积分资源锁定。
3、TM收到执行回复,只要有一方失败则分别向其他RM发起回滚事务,回滚完毕,资源锁释放。
4、TM收到执行回复,全部成功,此时向所有RM发起提交事务,提交完毕,资源锁释放。
DTP模型定义如下角色 :

  • AP(Application Program) : 既应用程序,可以理解为使用DTP分布式事务的程序。
  • RM(Resource Manager) : 即资源管理器,可以理解为事务的参与者,一般情况下是指一个数据库实例,通过资源管理器对该数据库进行控制,资源管理器控制着分支事务。
  • TM(Transaction Manager) : 事务管理器,负责协调和管理事务,事务管理器控制着全局事务,管理事务生命周期,并协调各个RM。全局事务是指分布式事务处理环境中,需要操作多个数据库共同完成一个工作,这个工作即是一个全局事务。
  • DTP模型定义TM和RM之间通讯的接口规范叫XA,简单理解为数据库提供的2PC接口协议,基于数据库的XA协议来实现2PC又称为XA方案
  • 以上三个角色之间的交互方式如下 :
    1)TM向AP提供应用程序编程接口,AP通过TM提交及回滚事务。
    2)TM交易中间件通过XA接口来通知RM数据库事务的开始、结束以及提交、回滚等。
    总结 :
    整个2PC的事务流程涉及到三个角色AP、RM、TM。AP指的是使用2PC分布式事务的应用程序;RM指的是资源管理器,它控制着分支事务;TM指的是事务管理器,它控制着整个全局事务。
    1)在准备阶段RM执行实际的业务操作,但不提交事务,资源锁定;
    2)在提交阶段TM会接收RM在准备阶段的执行回复,只要有任一个RM执行失败,TM会通知所有RM执行回滚操作,否则,TM将会通知所有RM提交该事务。提交阶段结束资源锁释放。
    XA方案的问题 :
    1、需要本地数据库支持XA协议。
    2、资源锁需要等到两个阶段结束才释放,性能较差。

Seata方案

Seata是阿里中间件团队发起的开源项目Fescar,后更名Seata,它是一个是开源的分布式事务框架。传统2PC的问题在Seata中得到了解决,它通过对本地关系数据库的分支事务的协调来驱动完成全局事务,是工作在应用层的中间件。主要优点是性能较好,且不长时间占用连接资源,它以高效并且对业务0入侵的方式解决微服务场景下面临的分布式事务问题,它目前提供AT模式(即2PC)及TCC模式的分布式事务解决方案。
Seata的设计思想如下 :
Seata的设计目标其一是对业务无入侵,因此从业务无入侵的2PC方案着手,在传统2PC的基础上演进,并解决2PC方案面临的问题。
Seata把一个分布式事务理解成一个包含来若干分支事务的全局事务。全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务,下图是全局事务与分支事务的关系图 :
分布式事务 - 图5
与传统2PC的模型类似,Seata定义了三个组件来协议分布式事务的处理过程 :
分布式事务 - 图6

  • Transaction Coordinator(TC):事务协调器,它是独立的中间件,需要独立部署运行,它维护全局事务的运行状态,接收TM指令发起全局事务的提交与回滚,负责与RM通信协调各个分支事务的提交或回滚。
  • Transaction Manager(TM):事务管理器,TM需要嵌入应用程序中工作,它负责开启一个全局事务,并最终向TC发起全局提交或全局回滚的指令。
  • Resource Manager(RM):控制分支事务,负责分支注册、状态汇报,并接收事务协调器TC的指令,驱动分支(本地)事务的提交和回滚。
    还拿新用户注册送积分举例Seata的分布式事务过程 :

分布式事务 - 图7
具体的执行流程如下 :

  1. 用户服务的TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID。
  2. 用户服务的RM向TC注册分支事务,该分支事务在用户服务执行新增用户逻辑,并将其纳入XID对应全局事务的管辖。
  3. 用户服务执行分支事务,向用户表插入一条记录。
  4. 逻辑执行到远程调用积分服务时(XID在微服务调用链路的上下文中传播)。积分服务的RM向TC注册分支事务,该分支事务执行增加积分的逻辑,并将其纳入XID对应全局事务的管辖。
  5. 积分服务执行分支事务,向积分记录表插入一条记录,执行完毕后,返回用户服务。
  6. 用户服务分支事务执行完毕。
  7. TM向TC发起针对XID的全局提交或回滚决议。
  8. TC调度XID下管辖的全部分支事务完成提交或回滚请求。

Seata实现2PC与传统2PC的差别 :
架构层次方面,传统2PC方案的RM实际上是在数据库层,RM本质上就是数据库自身,通过XA协议实现,而Seata的RM是以jar包的形式作为中间件层部署在应用程序的这一侧的。
两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollbcak,事务性资源的锁都要保持到Phase2完成才释放。而Seata的做法是在Phase1就将本地事务提交,这样就可以省去Phase2持锁的时间,整体提高效率。

Seata实现2PC事务

业务说明

本实例通过Seata中间件实现分布式事务,模拟两个账户的转账交易过程。两个账户在两个不同的银行(张三在bank1、李四在bank2),bank1和bank2是两个微服务。交易过程中,张三给李四转账制定金额。
上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。
分布式事务 - 图8

程序组成部分

本实例程序组成 部分如下 :
数据库 :MySQL-5.7.25
包括bank1和bank2两个数据库。
JDK:1.8
微服务框架 :spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE
seata客户端(RM、TM):spring-cloud-alibaba-seata-2.1.0RELEASE
seata服务端(TC):seata-server-0.7.1
微服务及数据库的关系 :
dtx/dtx-seata-demo/seata-demo-bank1 银行1,操作张三账户,链接数据库bank1
dtx/dtx-seata-demo/seata-demo-bank2 银行2,操作李四账户,链接数据库bank2
服务注册中兴 :dtx/discover-server
本实例程序技术架构如下 :
分布式事务 - 图9
交互流程如下 :
1、请求bank1进行转账,传入转账金额。
2、bank1减少转账金额,调用bank2,传入转账金额。

创建数据库

bank1库,包含张三账户

  1. CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank1` /*!40100 DEFAULT CHARACTER SET utf8 */;
  2. USE `bank1`; /*Table structure for table `account_info` */
  3. DROP TABLE IF EXISTS `account_info`;
  4. CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '户主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '银行卡号', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帐户密码', `account_balance` double DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `account_info` */ insert into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (2,'张三','1',NULL,1000); /*Table structure for table `de_duplication` */ DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` ( `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `de_duplication` */ /*Table structure for table `local_cancel_log` */ DROP TABLE IF EXISTS `local_cancel_log`; CREATE TABLE `local_cancel_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `local_cancel_log` */ /*Table structure for table `local_confirm_log` */ DROP TABLE IF EXISTS `local_confirm_log`; CREATE TABLE `local_confirm_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `local_confirm_log` */ /*Table structure for table `local_trade_log` */ DROP TABLE IF EXISTS `local_trade_log`; CREATE TABLE `local_trade_log` ( `tx_no` bigint(20) NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; DROP TABLE IF EXISTS `local_try_log`; CREATE TABLE `local_try_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `local_try_log` */ /*Table structure for table `undo_log` */ DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=167 DEFAULT CHARSET=utf8; /*Data for the table `undo_log` */ insert into `undo_log`(`id`,`branch_id`,`xid`,`context`,`rollback_info`,`log_status`,`log_created`,`log_modified`,`ext`) values (166,2019228885,'192.168.1.101:8888:2019228047','serializer=jackson','{}',1,'2019-08-11 15:16:43','2019-08-11 15:16:43',NULL);

bank2库,包含李四账户

  1. CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank2` /*!40100 DEFAULT CHARACTER SET utf8 */; USE `bank2`; /*Table structure for table `account_info` */ DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '户主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '银行卡号', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帐户密码', `account_balance` double DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `account_info` */ insert into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (3,'李四的账户','2',NULL,0); /*Table structure for table `de_duplication` */ DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` ( `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `de_duplication` */ /*Table structure for table `local_cancel_log` */ DROP TABLE IF EXISTS `local_cancel_log`; CREATE TABLE `local_cancel_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `local_cancel_log` */ /*Table structure for table `local_confirm_log` */ DROP TABLE IF EXISTS `local_confirm_log`; CREATE TABLE `local_confirm_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `local_confirm_log` */ /*Table structure for table `local_trade_log` */ DROP TABLE IF EXISTS `local_trade_log`; CREATE TABLE `local_trade_log` ( `tx_no` bigint(20) NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; DROP TABLE IF EXISTS `local_try_log`; CREATE TABLE `local_try_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `local_try_log` */ /*Table structure for table `undo_log` */ DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

启动TC(事务协调器)

(1)下载seata服务器
下载地址 :seata服务器
(2)解压并启动
winodws :【seata服务端解压路径】/bin/seata-server.bat -p 8888 -m file
mac/linux : 【seata服务端解压路径】nohup sh seata-server.sh -p 8888 -h 127.0.0.1 -m file &> seata.log &
注 :其中8888为服务端口号;file为启动模式,这里指seata服务将采用文件的方式存储信息。
分布式事务 - 图10
如上图出现“Server started。。。“的字样则表示启动成功。

discover-server

discover-server是服务注册中心,测试工程将自己注册至discover-server。

创建dtx-seata-demo

dtx-seata-demo是seata的测试工程,根据业务需求需要创建两个dex-seata-demo工程。
(1)父工程maven依赖说明
在dtx父工程中指定了SpringBoot和SpringCloud版本
分布式事务 - 图11
在dtx-seata-demo父工程中指定了spring-cloud-alibaba-dependencies的版本。
分布式事务 - 图12
(3)配置seata
在src/main/resource中,新增registry.conf、file.conf文件,内容可拷贝seata-server-0.7.1中的配置文件子。 在registry.conf中registry.type使用file:
分布式事务 - 图13
在file.conf中更改service.vgroup_mapping.[springcloud服务名]-fescar-service-group = “default”,并修改 service.default.grouplist =[seata服务端地址]
分布式事务 - 图14
关于vgroup_mapping的配置:
vgroup_mapping.事务分组服务名=Seata Server集群名称(默认名称为default) default.grouplist = Seata Server集群地址
在 org.springframework.cloud:spring-cloud-starter-alibaba-seata 的 org.springframework.cloud.alibaba.seata.GlobalTransactionAutoConfiguration 类中,默认会使用 ${spring.application.name}-fescar-service-group 作为事务分组服务名注册到 Seata Server上,如果和 file.conf 中的配置不一致,会提示 no available server to connect 错误
也可以通过配置 spring.cloud.alibaba.seata.tx-service-group 修改后缀,但是必须和 file.conf 中的配置保持 一致。
(4)创建代理数据源
新增DatabaseConfiguration.java,Seata的RM通过DataSourceProxy才能在业务代码的事务提交时,通过这个切 入点,与TC进行通信交互、记录undo_log等。
@Configuration public class DatabaseConfiguration { @Bean @ConfigurationProperties(prefix = “spring.datasource.ds0”) public DruidDataSource ds0() { DruidDataSource druidDataSource = new DruidDataSource(); return druidDataSource; } @Primary @Bean public DataSource dataSource(DruidDataSource ds0) { DataSourceProxy pds0 = new DataSourceProxy(ds0); return pds0; } }
3.3.7 Seata执行流程
1、正常提交流程
分布式事务 - 图15
2、回滚流程
回滚流程省略前的RM注册过程。
分布式事务 - 图16
要点说明 :
1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连接代理的目的就是第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务操作就一定有undo_log.
2、在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成就已经将分支事务提交,也就释放了锁资源。
3、TM开启全局事务开始,将XID全局事务id放在事务上下午中,通过feign调用也将XID传入下游分支事务,每个分支事务将自己的Branch ID分支事务ID与XID关联。
4、第二阶段全局事务提交,TC会通知各个分支参与者提交分支事务,在第一阶段就已经提交了分支事务,这里各个参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。
5、第二阶段全局事务回滚,TC会通知各个分支参与者回滚分支事务,通过XID和Branch ID找到相应的回滚日志,通过回滚日志生成反向的SQL并执行,以完成分支事务回滚到之前的状态,如果回滚失败则会重试回滚操作。
3.3.8 dtx-seata-demo-bank1
dtx-seata-demo-bank1实现如下功能:
1、张三账户减少金额,开启全局事务。
2、远程调用bank2向李四转账。
(1)DAO

  1. @Mapper
  2. @Component
  3. public interface AccountInfoDao {
  4. //更新账户金额
  5. @Update("update account_info set account_balance = account_balance + #{amount} where account_no = #{accountNo}") int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
  6. }

(2) FeignClient
远程调用bank2的客户端

  1. @FeignClient(value = "seata‐demo‐bank2",fallback = Bank2ClientFallback.class)
  2. public interface Bank2Client {
  3. @GetMapping("/bank2/transfer")
  4. String transfer(@RequestParam("amount") Double amount); }
  5. @Component
  6. public class Bank2ClientFallback implements Bank2Client{
  7. @Override
  8. public String transfer(Double amount) { return "fallback"; }
  9. }

(3)Service

  1. @Service
  2. public class AccountInfoServiceImpl implements AccountInfoService {
  3. private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);
  4. @Autowired AccountInfoDao accountInfoDao;
  5. @Autowired Bank2Client bank2Client;
  6. //张三转账
  7. @Override
  8. @GlobalTransactional
  9. @Transactional
  10. public void updateAccountBalance(String accountNo, Double amount) {
  11. //logger.info("******** Bank1 Service Begin ... xid: {}" , RootContext.getXID());
  12. //张三扣减金额
  13. accountInfoDao.updateAccountBalance(accountNo,amount*‐1);
  14. //向李四转账
  15. String remoteRst = bank2Client.transfer(amount);
  16. //远程调用失败
  17. if(remoteRst.equals("fallback")){
  18. throw new RuntimeException("bank1 下游服务异常");
  19. }
  20. //人为制造错误
  21. if(amount==3){
  22. throw new RuntimeException("bank1 make exception 3");
  23. }
  24. }
  25. }

将@GlobalTransactional注解标注在全局事务发起的Service实现方法上,开启全局事务 :
GlobalTransactionalInterceptor会拦截@GlobalTransactional注解的方法,生成全局事务ID(XID),XID会在整个分布式事务中传递。
在远程调用时,spring-cloud-alibaba-seata会拦截Feign调用将XID传递到下游服务。
(6)Controller

  1. @RestController
  2. public class Bank1Controller {
  3. @Autowired AccountInfoService accountInfoService;
  4. //转账
  5. @GetMapping("/transfer")
  6. public String transfer(Double amount){
  7. accountInfoService.updateAccountBalance("1",amount);
  8. return "bank1"+amount;
  9. }
  10. }

3.3.9 dtx-seata-demo-bank2
dtx-seata-demo-bank2实现如下功能:
1、李四账户增加金额。
dtx-seata-demo-bank2在本账户事务中作为分支事务不使用@GlobalTransactional。
(1)DAO

  1. @Mapper
  2. @Component
  3. public interface AccountInfoDao {
  4. //向李四转账
  5. @Update("UPDATE account_info SET account_balance = account_balance + #{amount} WHERE account_no = #{accountNo}")
  6. int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
  7. }

(2)Service

  1. @Service
  2. public class AccountInfoServiceImpl implements AccountInfoService {
  3. private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);
  4. @Autowired AccountInfoDao accountInfoDao;
  5. @Override
  6. @Transactional
  7. public void updateAccountBalance(String accountNo, Double amount) {
  8. http://
  9. logger.info("******** Bank2 Service Begin ... xid: {}" , RootContext.getXID());
  10. //李四增加金额
  11. accountInfoDao.updateAccountBalance(accountNo,amount);
  12. //制造异常
  13. if(amount==2){ throw new RuntimeException("bank1 make exception 2"); }
  14. }
  15. }

(3)Controller

  1. @RestController
  2. public class Bank2Controller {
  3. @Autowired
  4. AccountInfoService accountInfoService;
  5. @GetMapping("/transfer") public String transfer(Double amount){
  6. accountInfoService.updateAccountBalance("2",amount); return "bank2"+amount;
  7. }
  8. }

3.3.10 测试场景

  • 张三向李四转账成功。
  • 李四事务失败,张三事务回滚成功。
  • 张三事务失败,李四事务回滚成功。
  • 分支事务超时测试。

3.4. 小结
传统2PC(基于数据库XA协议)和Seata实现2PC的两种2PC方案,由于Seata的零入侵并且解决了传统2PC长期锁资源的问题,所以推荐采用Seata实现2PC。
Seata实现2PC要点 :
1、全局事务开始使用GlobalTransactional标识。
2、每个本地事务方案仍然使用@Transactional标识。
3、每个数据都需要创建undo_log表,此表是Seata保证本地事务一致性的关键。

TCC

分布式事务解决方案之TCC

什么是TCC事务

TCC是Try、Confirm、Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作 :预处理Try、确认Confirm、撤销Cancel。Try操作做业务检查及资源预留,Confirm做业务确认操作,Cancel实现一个与Try相反的操作既回滚操作。TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel操作若执行失败,TM会进行重试。
分布式事务 - 图17
分支事务失败的情况 :
分布式事务 - 图18
TCC分为三个阶段 :

  1. Try阶段是做业务检查(一致性)及资源预留(隔离),此阶段仅是一个初步操作,它和后续的Confirm一起才能真正构成一个完整的业务逻辑。
  2. Confirm阶段是做确认提交,Try阶段所有分支事务执行成功后开始执行Confirm。通常情况下,采用TCC则认为Confirm阶段是不会出错的。即 :只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引入重试机制或人工处理。
  3. Cancel阶段是在业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。
  4. TM事务管理器
    TM事务管理器可以实现为独立的服务,也可以让全局事务发起方充当TM的角色,TM独立出来是为了成为公用组件,是为了考虑结构和软件复用。
    TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条,用来记录事务上下文,追踪和记录状态,由于Confirm和Cancel失败需进行重试,因此需要实现为幂等性是指同一个操作无论请求多少次,其结果都相同。

TCC解决方案

目前市面上的TCC框架众多比如下面这几种 :
分布式事务 - 图19
Seata也支持TCC,但Seata的TCC模式对Spring Cloud并没有提供支持。我们的目标是理解TCC原理以及事务协调运作的过程,因此更倾向于轻量级易于理解的框架。
Hmily是一个高性能分布式事务TCC开源框架。基于Java语言来开发(JDK1.8),支持Dubbo,Spring Cloud等RPC框架进行分布式事务。它目前支持以下特性 :

  • 支持嵌套事务(Nested transaction support)。
  • 采用disruptor框架进行事务日志的异步读写,与RPC框架的性能毫无差别。
  • 支持SpringBoot-starter项目启动,使用简单。
  • RPC框架支持 :dubbo、motan、springcloud。
  • 本地事务存储支持 :redis、mongodb、zookeeper、file、mysql。
  • 事务日志序列化支持 :java、hessian、kryo、protostuff。
  • 采用Aspect AOP切面思想与Spring无缝集成,天然支持集群。
  • RPC事务恢复,超时异常恢复等。
    Hmily利用AOP对参与分布式事务的本地方法与远程方法进行拦截处理,通过多方拦截,事务参与者能透明的调用到另一方的Try、Confirm、Cancel方法;传递事务上下文;并记录事务日志,酌情进行补偿,重试等。
    Hmily不需要事务协调服务,但需要提供一个数据库(mysql/mongodb/zookeeper/redis/file)来进行日志存储。
    Hmily实现的TCC服务与普通的服务一样,只需要暴露一个接口,也就是它的Try业务。Confirm/Cancel业务逻辑,只是因为全局事务提交/回滚的需要才提供的,因此Confirm/Cancel业务只需要被Hmily TCC事务框架发现即可,不需要被调用它的其他业务服务所感知。
    官网介绍 :https://dromara.org/website/zh-cn/docs/hmily/index.html
    TCC需要注意三种异常处理分别是空回滚、幂等、悬挂 :
    空回滚
    在没有调用TCC资源Try方法的情况下,调用来二阶段的Cancel方法,Cancel方法需要识别出这是一个空回滚,然后直接返回成功。
    出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶段,当故障恢复后,分布式事务进行回滚则会调用二阶段的Cancel方法,从而形成空回滚。
    解决思路是关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行来,那就是正常回滚;如果没执行,那就是空回滚。前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务ID和分支事务ID,第一阶段Try方法里会插入一条记录,表示一阶段执行来。Cancel接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。
    幂等
    通过前面介绍已经了解到,为了保证TCC二阶段提交重试机制不会引发数据不一致,要求TCC的二阶段Try、Confirm和Cancel接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题。
    解决思路在上述 “分支事务记录”中增加执行状态,每次执行前都查询该状态。
    悬挂
    悬挂就是对于一个分布式事务,其二阶段Cancel接口比Try接口先执行。
    出现原因是在RPC调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时RPC调用的网络发生拥堵,通常RPC调用是有超时时间的,RPC超时以后,TM就会通知RM回滚该分布式事务,可能回滚完成后,RPC请求才到达参与者真正执行,而一个Try方法预留的业务资源,只有该分布式事务才能使用,该分布式事务第一阶段预留的业务资源就再也没有人能够处理了,对于这种情况,我们就称为悬挂,即业务资源预留后无法继续处理。
    解决思路是如果二阶段执行完成,那一阶段就不能再继续执行。在执行一阶段事务时判断在该全局事务下,“分支事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。
    举例,场景为A转账30元给B,A和B账户在不同的服务。
    方案 1
    账户A

try: 检查余额是否够30元 扣减30元 confirm: 空 cancel: 增加30元
账户B
try: 增加30元 confirm: 空 cancel: 减少30元
方案1说明:
1)账户A,这里的余额就是所谓的业务资源,按照前面提到的原则,在第一阶段需要检查并预留业务资源,因此, 我们在扣钱 TCC 资源的 Try 接口里先检查 A 账户余额是否足够,如果足够则扣除 30 元。 Confirm 接口表示正式 提交,由于业务资源已经在 Try 接口里扣除掉了,那么在第二阶段的 Confirm 接口里可以什么都不用做。Cancel 接口的执行表示整个事务回滚,账户A回滚则需要把 Try 接口里扣除掉的 30 元还给账户。
2)账号B,在第一阶段 Try 接口里实现给账户B加钱,Cancel 接口的执行表示整个事务回滚,账户B回滚则需要把 Try 接口里加的 30 元再减去。
方案1的问题分析:
1)如果账户A的try没有执行在cancel则就多加了30元。
2)由于try,cancel、confirm都是由单独的线程去调用,且会出现重复调用,所以都需要实现幂等。
3)账号B在try中增加30元,当try执行完成后可能会其它线程给消费了。
4)如果账户B的try没有执行在cancel则就多减了30元。
问题解决:
1)账户A的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。
2)try、cancel、confirm方法实现幂等。
3)账户B在try方法中不允许更新账户金额,在confirm中更新账户金额。
4)账户B的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。
优化方案:
账户A :
try: try幂等校验 try悬挂处理 检查余额是否够30元 扣减30元 confirm: 空 cancel: cancel幂等校验 cancel空回滚处理 增加可用余额30元
账户B :
try: 空 confirm: confirm幂等校验 正式增加30元 cancel: 空
4.3. Hmily实现TCC事务
4.3.1.业务说明
通过Hmily实现TCC分布式事务,模拟两个账户的转账交易过程。
两个账户分别在不同的银行(张三在bank1、李四在bank2),bank1、bank2是两个微服务。交易过程是,张三给李四转账制定金额。
上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性事务。
分布式事务 - 图20

4.3.2. 程序组成部分
数据库:MySQL-5.7.25
JDK:64位 jdk1.8.0_201 微服务:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE Hmily:hmily-springcloud.2.0.4-RELEASE
微服务及数据库的关系 :
dtx/dtx-tcc-demo/dtx-tcc-demo-bank1 银行1,操作张三账户, 连接数据库bank1 dtx/dtx-tcc-demo/dtx-tcc-demo-bank2 银行2,操作李四账户,连接数据库bank2
服务注册中心:dtx/discover-server
4.3.3. 创建数据库
创建hmily数据库,用于存储hmily框架记录的数据。

  1. CREATE DATABASE hmily CHARACTER SET utf8 COLLATE utf8_general_ci’;

创建bank1库,并导入以下表结构和数据(包含张三账户)

  1. CREATE DATABASE bank1 CHARACTER SET utf8 COLLATE utf8_general_ci’;

创建bank2库,并导入以下表结构和数据(包含李四账户)

  1. CREATE DATABASE bank2 CHARACTER SET utf8 COLLATE utf8_general_ci’;
  2. DROP TABLE IF EXISTS account_info;
  3. CREATE TABLE account_info (
  4. id bigint(20) NOT NULL AUTO_INCREMENT,
  5. account_name varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘户 主姓名’,
  6. account_no varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘银行 卡号’,
  7. account_password varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘帐户密码’,
  8. account_balance double NULL DEFAULT NULL COMMENT ‘帐户余额’,
  9. PRIMARY KEY (id) USING BTREE
  10. ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
  11. INSERT INTO account_info VALUES (2, ‘张三的账户’, 1’, ‘’, 10000);

每个数据库都创建try、confirm、cancel三张日志表:

CREATE TABLE `local_try_log` ( `tx_no` varchar(64) NOT NULL COMMENT `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `local_confirm_log` ( `tx_no` varchar(64) NOT NULL COMMENT `create_time` datetime DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `local_cancel_log` ( `tx_no` varchar(64) NOT NULL COMMENT `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

4.3.5 工程dtx-tcc-demo
(1)引入maven依赖
org.dromara hmily‐springcloud 2.0.4‐RELEASE
(2)配置hmily
application.yml :

org: 
    dromara: 
      hmily: 
        serializer : kryo
      recoverDelayTime : 128
            retryMax: 30
            scheduledDelay: 128
            scheduledThreadMax: 10
      repositorySupport: db
            started: true
      hmilyDbConfig : 
        driverClassName: com.mysql.jdbc.Driver
        url: jdbc:mysql://localhost:3306/bank?useUnicode=true
        username: root
        password: root

新增配置类接收application.yml中的Hmily配置信息,并创建HmilyTransactionBootstrap Bean:

@Bean
public HmilyTransactionBootstrap hmilyTransactionBootstrap(

    HmilyInitService hmilyInitService){

        HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService);
        hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer"));
        hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmily.recoverDelayTime"))); 
        hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retryMax"))); 
        hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledDelay"))); 
        hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledThreadMax"))); 
        hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySupport")); 
        hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.started"))); 
        HmilyDbConfig hmilyDbConfig = new HmilyDbConfig(); 
        hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassName")); 
        hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url")); 
        hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username")); 
        hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password"));
        hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig);
        return hmilyTransactionBootstrap;
    }
}

启动类增加@EnableAspectJAutoProxy并增加org.dromara.hmily的扫描项:

@SpringBootApplication
@EnableDiscoveryClient
@EnableHystrix 
@EnableFeignClients(basePackages = {"cn.itcast.dtx.tccdemo.bank1.spring"}) 
@ComponentScan({"cn.itcast.dtx.tccdemo.bank1","org.dromara.hmily"}) public class Bank1HmilyServer { 
    public static void main(String[] args) { 
        SpringApplication.run(Bank1HmilyServer.class, args); 
    } 
}

4.3.7 dtx-tcc-demo-bank1
dtx-tcc-demo-bank1实现try和cancel方法,如下 :
try: try幂等校验 try悬挂处理 检查余额是够扣减金额 扣减金额 confirm: 空 cancel: cancel幂等校验 cancel空回滚处理 增加可用余额

  1. Dao
    @Mapper 
    @Component 
    public interface AccountInfoDao { 
     @Update("update account_info set account_balance=account_balance - #{amount} where account_balance>=#{amount} and account_no=#{accountNo} ") int subtractAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); @Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ") int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); 
     /** * 增加某分支事务try执行记录 * @param localTradeNo 本地事务编号 * @return */ 
     @Insert("insert into local_try_log values(#{txNo},now());") int addTry(String localTradeNo); 
     @Insert("insert into local_confirm_log values(#{txNo},now());") int addConfirm(String localTradeNo);
     @Insert("insert into local_cancel_log values(#{txNo},now());") int addCancel(String localTradeNo);
     /** * 查询分支事务try是否已执行 * @param localTradeNo 本地事务编号 * @return */ 
     @Select("select count(1) from local_try_log where tx_no = #{txNo} ") int isExistTry(String localTradeNo); 
     /** * 查询分支事务confirm是否已执行 * @param localTradeNo 本地事务编号 * @return */ 
     @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ") int isExistConfirm(String localTradeNo); 
     /** * 查询分支事务cancel是否已执行 * @param localTradeNo 本地事务编号 * @return */
     @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ") int isExistCancel(String localTradeNo); 
    }
    

2)try和cancel方法

@Slf4j 
@Service 
public class AccountInfoServiceImpl implements AccountInfoService { 
    @Autowired private AccountInfoDao accountInfoDao; 
    @Autowired private Bank2Client bank2Client; 
    /** * 只要标记@Hmily就是try方法,在注解中指定confirm、cancel两个方法的名字 * * @param accountNo * @param amount */ 
    @Hmily(confirmMethod = "commit", cancelMethod = "rollback") @Transactional(rollbackFor = Exception.class) 
    @Override 
    public void updateAccountBalance(String accountNo, Double amount) { 
        // 事务id 
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
        //log.info("Bank1 Service begin try ..." + transId); int existTry = accountInfoDao.isExistTry(transId); 
        // 幂等判断 判断local_try_log表中是否有try日志记录,如果有不再执行 
        // try幂等校验 
        if (existTry > 0) { 
           log.info("Bank1 Service 已经执行try,无需重复执行,事务id :{}", transId);
            return; 
        } 
        // try悬挂处理,如果cancel、confirm有一个已经执行了,try不再执行 
        if (accountInfoDao.isExistCancel(transId) > 0 || accountInfoDao.isExistConfirm(transId) > 0) { 
            //log.info("Bank1 Service 已经执行confirm或cancel,悬挂处理,事务id :{}", transId); return; } 
            // 从账户扣减 
            if (accountInfoDao.subtractAccountBalance(accountNo, amount) <= 0) { 
                // 扣减失败 
                throw new HmilyRuntimeException("bank1 exception, 扣减失败,事务id :{}" + transId); 
            } 
                // 增加本地事务try成功记录,用于幂等性控制标识 
            accountInfoDao.addTry(transId); 
            // 远程调用bank2 
            if (bank2Client.transfer(amount)) { 
                throw new HmilyRuntimeException("bank2Client exception,事务id:{}"+transId); 
            } 
            // 异常一定要抛在Hmily里面 
            if (amount == 10) { 
                throw new RuntimeException("Bank2 make exception 10"); 
            } 
            //log.info("Bank2 Service end try .." + transId); 
        } 
        @Transactional(rollbackFor = Exception.class) 
        public void commit(String accountNo, double amount) { 
            String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
            //log.info("Bank1 Service begin commit .." + transId); 
        } 
        @Transactional(rollbackFor = Exception.class) 
        public void rollback(String accountNo, double amount) { 
            String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); 
            //log.info("Bank1 Service begin rollback..." + transId); 
            // 空回滚处理,try阶段没有执行什么也不用做。 
            if (accountInfoDao.isExistTry(transId) == 0) {
                //log.info("Bank1 try 阶段失败 。。无需rollback " + transId); 
                return; 
            }
            // 幂等性校验,已经执行过了,什么也不用做 
            if (accountInfoDao.isExistCancel(transId) > 0) { 
                //log.info("Bank1 已经执行过rollback 。。无需再次rollback " + transId); 
                return;
            } 
            // 再将金额加回账户
            accountInfoDao.addAccountBalance(accountNo, amount); 
            // 添加cancel日志,用于幂等性控制标识 
            accountInfoDao.addCancel(transId); 
            //log.info("Bank1 Service end rollback ... " + transId); 
        } 
    }

3)feignClient

@FeignClient(value = "seata-demo-bank2", fallback = Bank2Client.class) 
public interface Bank2Client { 
    @GetMapping("/bank2/transfer") 
    @Hmily 
    Boolean transfer(@RequestParam("amount") Double amount); 
}
  1. Controller
    @RestController 
    public class Bank1Controller { 
     @Autowired private AccountInfoService accountInfoService;
     @RequestMapping("/transfer") 
     public String test(@RequestParam("amount") Double amount) { 
         accountInfoService.updateAccountBalance("1", amount); 
         return "bank1" + amount; 
     } 
    }
    

4.3.8 dtx-tcc-demo-bank2
dtx-tcc-demo-bank2实现如下功能 :
try: 空 confirm: confirm幂等校验 正式增加金额 cancel: 空
1)Dao

@Component 
@Mapper 
public interface AccountInfoDao { 
    @Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ") int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); 
    /** * 增加某分支事务try执行记录 * @param localTradeNo 本地事务编号 * @return */ 
    @Insert("insert into local_try_log values(#{txNo},now());") int addTry(String localTradeNo); 
    @Insert("insert into local_confirm_log values(#{txNo},now());") int addConfirm(String localTradeNo); 
    @Insert("insert into local_cancel_log values(#{txNo},now());") int addCancel(String localTradeNo); 
    /** * 查询分支事务try是否已执行 * @param localTradeNo 本地事务编号 * @return */ 
    @Select("select count(1) from local_try_log where tx_no = #{txNo} ") int isExistTry(String localTradeNo); 
    /** * 查询分支事务confirm是否已执行 * @param localTradeNo 本地事务编号 * @return */ 
    @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ") int isExistConfirm(String localTradeNo); 
    /** * 查询分支事务cancel是否已执行 * @param localTradeNo 本地事务编号 * @return */ 
    @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ") int isExistCancel(String localTradeNo); 
}

2)实现confirm方法

@Slf4j
@Service
public class AccountInfoServiceImpl implements AccountInfoService {
    @Autowired private AccountInfoDao accountInfoDao;
    @Transactional(rollbackFor = Exception.class) 
    @Hmily(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod")
    @Override public void updateAccountBalance(String accountNo, Double amount) {
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); 
        //log.info("Bank2 Service Begin try ... " + transId); 
    } 
    @Transactional(rollbackFor = Exception.class)
    public void confirmMethod(String accountNo, Double amount) { 
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); 
        //log.info("Bank2 Service commit ..." + transId);
        // 幂等性校验,已经执行过了,什么也不用做
        if (accountInfoDao.isExistConfirm(transId) > 0) { 
            //log.info("Bank2 已经执行过confirm 。。无需再次confirm " + transId); 
            return; 
        } 
        // 正式增加金额 
        accountInfoDao.addAccountBalance(accountNo, amount); 
        // 添加confirm日志
        accountInfoDao.addConfirm(transId); }
    @Transactional(rollbackFor = Exception.class) 
    public void cancelMethod(String accountNo, Double amount) { 
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
        //log.info("Bank2 Service begin cancel ... " + transId); 
    }
}

3)Controller

@RestController
public class Bank2Controller {
    @Autowired
    private AccountInfoService accountInfoService; 
    @RequestMapping("/transfer") 
    public Boolean test2(@RequestParam("amount") Double amount) { 
        accountInfoService.updateAccountBalance("2", amount); 
        return true; 
    } 
}

3.3.9 测试场景

  • 张三向李四转账成功。
  • 李四事务失败,张三事务回滚成功。
  • 张三事务失败,李四分支事务回滚成功。
  • 分支事务超时测试。

4.4. 小结
如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处 理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能
而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。

最大努力通知

什么是最大努力通知

最大努力通知也是一种解决分布式事务的方案,下边是一个是充值的例子:
分布式事务 - 图21
交互流程 : 1、账户系统调用充值系统接口 2、充值系统完成支付处理向账户系统发起充值结果通知 若通知失败,则充值系统按策略进行重复通知 3、账户系统接收到充值结果通知修改充值状态 4、账户系统未接收到通知会主动调用充值系统的接口查询充值结果 通过上边的例子我们总结最大努力通知方案的目标 : 目标 :发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。 具体包括 : 1、有一定的消息重复通知机制。 因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。 2、消息校对机制。 如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。 最大努力通知与可靠消息一致性有什么不同? 1、解决方案思想不同 可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。 最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。 2、两者的业务应用场景不同 可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。 最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。 3、技术解决方向不同 可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。 最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消费(业务处理结果)。

解决方案

通过对最大努力通知的理解,采用MQ的ack机制就可以实现最大努力通知。 方案1 :
分布式事务 - 图22
本方案是利用MQ的ack机制由MQ向接收通知方发送通知,流程如下 : 1、发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。 注意 :如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果。 2、接收通知方监听MQ。 3、接收通知方接收消息,业务处理完成回应ack。 4、接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用rocketMq,在broker中可进行配置),直到达到通知要求的时间窗口上限。 5、接收通知方可通过消息校对接口来校对消息的一致性。 方案2 : 本方案也是利用MQ的ack机制,与方案1不同的是应用程序向接收通知方发送通知,如下图 :
分布式事务 - 图23
交互流程如下 : 1、发起通知方将通知发给MQ。 使用可靠消息一致方案中的事务消息保证本地事务与消息的原子性,最终将通知先发给MQ。 2、通知程序监听MQ,接收MQ的消息。 方案1中接收通知方直接监听MQ,方案2中由通知程序监听MQ。 通知程序若没有回应ack则MQ会重复通知。 3、通知程序通过互联网接口协议(如http、webservice)调用接收通知方案接口,完成通知。 通知程序调用接收通知方案接口成功就表示通知成功,即消费MQ消息成功,MQ将不再向通知程序投递通知消息。 4、接收通知方可通过消息校对接口来校对消息的一致性。 方案1和方案2的不同点 : 1、方案1中接收通知方与MQ接口,即接收通知方案监听MQ,此方案主要应用与内部应用之间的通知。 2、方案2中由通知程序与MQ接口,通知程序监听MQ,收到MQ的消息后由通知程序通过互联网接口协议调用接收通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。

RocketMQ实现最大努力通知型事务

业务说明

本实例通过RocketMq中间件实现最大努力通知型分布式事务,模拟充值过程。 本案例有账户系统和充值系统两个微服务,其中账户系统的数据库是bank1数据库,其中有张三账户。充值系统的数据库使用bank1_pay数据库,记录了账户的充值记录。 业务流程如下图 :
分布式事务 - 图24
交互流程如下 : 1、用户请求充值系统进行充值。 2、充值系统完成充值将充值结果发给MQ。 3、账户系统监听MQ,接收充值结果通知,如果接收不到消息,MQ会重复发送通知。接收到充值结果通知账户系统增加充值金额。 4、账户系统也可以主动查询充值系统的充值结果查询接口,增加金额。

程序组成部分

本示例程序组成部分如下 : 数据库:MySQL-5.7.25 包括bank1和bank1_pay两个数据库。 JDK:64位 jdk1.8.0_201 rocketmq 服务端:RocketMQ-4.5.0 rocketmq 客户端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE 微服务框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE 微服务及数据库的关系 : dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-bank1 银行1,操作张三账户, 连接数据库bank1 dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-pay 银行2,操作充值记录,连接数据库bank1_pay
分布式事务 - 图25
交互流程如下 : 1、用户请求充值系统进行充值。 2、充值系统完成充值将充值结果发给MQ。 3、账户系统监听MQ,接收充值结果通知,如果接收不到消息,MQ会重复发送通知。接收到充值结果通知账户系统增加充值金额。 4、账户系统也可以主动查询充值系统的充值结果查询接口,增加金额。

创建数据库

创建bank1库,并导入以下表结构和数据(包含张三账户)

CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户 主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帐户密码',
`account_balance` double NULL DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);
DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` (
`tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

创建bank1_pay库,并导入以下表结构:

CREATE DATABASE `bank1_pay` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; CREATE TABLE `account_pay` (
`id` varchar(64) COLLATE utf8_bin NOT NULL,
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账号', `pay_amount` double NULL DEFAULT NULL COMMENT '充值余额',
`result` varchar(20) COLLATE utf8_bin DEFAULT NULL COMMENT '充值结果:success,fail',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

启动RocketMQ

rocketMQ启动方式与RocketMQ实现可靠消息最终一致性事务中完全一致

discover-server

discover-server是服务注册中心,测试工程将自己注册至discover-server。

工程概述

(1)父工程maven依赖说明 在dtx父工程中指定了SpringBoot和SpringCloud版本

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring‐boot‐dependencies</artifactId>                          <version>2.1.3.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency> 
    <groupId>org.springframework.cloud</groupId> 
    <artifactId>spring‐cloud‐dependencies</artifactId>  <version>Greenwich.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

在dtx-notifymsg-demo父工程中指定了rocketmq-spring-boot-starter的版本。

<dependency>
    <groupId>org.apache.rocketmq</groupId> 
    <artifactId>rocketmq‐spring‐boot‐starter</artifactId> 
    <version>2.0.2</version>
</dependency>

(2)配置rocketMQ 在application-local.properties中配置rocketMQ nameServer地址及生产组 :

rocketmq.producer.group = producer_bank2
rocketmq.name-server = 127.0.0.1:9876

dtx-notifydemo-pay

dtx-notifydemo-pay实现如下功能 : 1、充值接口; 2、充值完成要通知; 3、充值结果查询接口。 (2)Dao

@Mapper
@Component
public interface AccountPayDao {
@Insert("insert into account_pay(id,account_no,pay_amount,result) values(#{id},# {accountNo},#{payAmount},#{result})")
int insertAccountPay(@Param("id") String id,@Param("accountNo") String accountNo, @Param("payAmount") Double pay_amount,@Param("result") String result);
@Select("select id,account_no accountNo,pay_amount payAmount,result from account_pay where id=#{txNo}")
AccountPay findByIdTxNo(@Param("txNo") String txNo); 
}

(3)Service

@Service
@Slf4j
public class AccountPayServiceImpl implements AccountPayService{
@Autowired
RocketMQTemplate rocketMQTemplate;
@Autowired
AccountPayDao accountPayDao;
@Transactional
@Override
public AccountPay insertAccountPay(AccountPay accountPay) {
    int result = accountPayDao.insertAccountPay(accountPay.getId(),
    accountPay.getAccountNo(), accountPay.getPayAmount(), "success");
    if(result>0){ //发送通知
        rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay);
        return accountPay; 
    }
    return null; 
}
    @Override
    public AccountPay getAccountPay(String txNo) {
        AccountPay accountPay = accountPayDao.findByIdTxNo(txNo);
        return accountPay; 
    }
}

(4)Controller

@RestController
public class AccountPayController {
@Autowired
AccountPayService accountPayService;
//充值
@GetMapping(value = "/paydo")
public AccountPay pay(AccountPay accountPay){
    //事务号
    String txNo = UUID.randomUUID().toString(); accountPay.setId(txNo);
    return accountPayService.insertAccountPay(accountPay);
}
//查询充值结果
@GetMapping(value = "/payresult/{txNo}")
public AccountPay payresult(@PathVariable("txNo") String txNo){
        return accountPayService.getAccountPay(txNo); 
    }
}

dtx-notifydemo-bank1

dtx-notifydemo-bank1实现如下功能 : 1、监听MQ,接收充值结果,根据充值结果完成账户金额修改。 2、主动查询充值系统,根据充值结果完成账户金额修改。 1)Dao

@Mapper
@Component
public interface AccountInfoDao {
    //修改账户金额
    @Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
    //查询幂等记录,用于幂等控制
    @Select("select count(1) from de_duplication where tx_no = #{txNo}") 
    int isExistTx(String txNo);
    //添加事务记录,用于幂等控制
    @Insert("insert into de_duplication values(#{txNo},now());") 
    int addTx(String txNo);
}

2)AccountInfoService

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
    @Autowired
    AccountInfoDao accountInfoDao;
    @Autowired
    PayClient payClient; /**
    * 更新帐号余额,并发送消息 *
    * @param accountChange */
    @Transactional
    @Override
    public void updateAccountBalance(AccountChangeEvent accountChange) {
    //幂等校验
    int existTx = accountInfoDao.isExistTx(accountChange.getTxNo()); if(existTx >0){
    log.info("已处理消息:{}", JSONObject.toJSONString(accountChange));
    return ; }
    //添加事务记录 accountInfoDao.addTx(accountChange.getTxNo()); //更新账户金额
    accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount()); }
    /**
    * 主动查询充值结果 *
    * @param tx_no */
    @Override
    public AccountPay queryPayResult(String tx_no) {
        //主动请求充值系统查询充值结果
        AccountPay accountPay = payClient.queryPayResult(tx_no); //充值结果
        String result = accountPay.getResult(); 
        log.info("主动查询充值结果:{}",         JSON.toJSONString(accountPay)); 
        if("success".equals(result)){
            AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
            accountChangeEvent.setAccountNo(accountPay.getAccountNo());
            accountChangeEvent.setAmount(accountPay.getPayAmount());
            accountChangeEvent.setTxNo(accountPay.getId());
            updateAccountBalance(accountChangeEvent);
        }
        return accountPay; 
    }
}
@FeignClient(value = "dtx‐notifymsg‐demo‐pay", fallback = PayFallback.class)
public interface PayClient {
    @GetMapping("/pay/payresult/{txNo}")
    AccountPay queryPayResult(@PathVariable("txNo") String txNo); 
}
@Component
public class PayFallback implements PayClient {
    @Override
    public AccountPay queryPayResult(String txNo) {
        AccountPay accountPay = new AccountPay();
        accountPay.setResult("fail");
        return accountPay;
    } 
}

3)监听MQ

@Component
@Slf4j
@RocketMQMessageListener(topic="topic_notifymsg",consumerGroup="consumer_group_notifymsg_bank1") 
public class NotifyMsgListener implements RocketMQListener<AccountPay> {
    @Autowired
    AccountInfoService accountInfoService;
    @Override
    public void onMessage(AccountPay accountPay) {
        log.info("接收到消息:{}", JSON.toJSONString(accountPay)); 
        AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
        accountChangeEvent.setAmount(accountPay.getPayAmount());
        accountChangeEvent.setAccountNo(accountPay.getAccountNo());
        accountChangeEvent.setTxNo(accountPay.getId());
        accountInfoService.updateAccountBalance(accountChangeEvent); 
        log.info("处理消息完成:{}", JSON.toJSONString(accountChangeEvent));
    } 
}

4)Controller

@RestController
@Slf4j
public class AccountInfoController {
    @Autowired
    private AccountInfoService accountInfoService;
        //主动查询充值结果
        @GetMapping(value = "/payresult/{txNo}")
        public AccountPay result(@PathVariable("txNo") String txNo){
        AccountPay accountPay = accountInfoService.queryPayResult(txNo);
        return accountPay; 
    }
}

测试场景

  • 充值系统充值成功,账户系统主动查询充值结果,修改账户金额。
  • 充值系统充值成功,发送消息,账户系统接收消息,修改账户金额。
  • 账户系统修改账户金额幂等测试。

    小结

    最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务; 最大努力通知方案需要实现如下功能 : 1、消息重复通知机制。 2、消息校对机制。

    综合案例分析

    P2P介绍

    P2P 金融又叫P2P信贷。其中P2P是 peer-to-peer 或 person-to-person 的简写,意思是:个人对个人。P2P金融指个人与个人间的小额借贷交易,一般需要借助电子商务专业网络平台帮助借贷双方确立借贷关系并完成相关交易手续。借款者可自行发布借款信息,包括金额、利息、还款方式和时间,实现自助式借款;投资者根据借款人发布的信息,自行决定出借金额,实现自助式借贷。目前,国家对P2P行业的监控与规范性控制越来越严格,出台了很多政策来对其专项整治。并主张采用“银行存管模式”来规避P2P平台挪用借投人资金的风险,通过银行开发的“银行存管系统”管理投资者的资金,每位P2P平台用户在银行的存管系统内都会有一个独立账号,平台来管理交易,做到资金和交易分开,让P2P平台不能接触到资金,就可以一定程度避免资金被挪用的风险。
    什么是银行存管模式?
    银行存管模式涉及到2套账户体系,P2P平台和银行各一套账户体系。投资人在P2P平台注册后,会同时跳转到银行再开一个电子账户,2个账户间有一一对应的关系。当投资人投资时,资金进入的是平台在银行为投资人开设的二级账户中,每一笔交易,是由银行在投资人与借款人间的交易划转,P2P平台仅能看到信息的流动。
    分布式事务 - 图26

总体业务流程

分布式事务 - 图27

术语描述银行存管模式这种模式下,涉及到2套账户系统,P2P平台和银行各一套账户体系。投资人在P2P平台注册后,会同时跳转到银行再开一个电子账户,2个账户间有一一对应的关系。当投资人投资时,资金进入的是平台在银行为投资人开设的二级账户中,每一笔交易,由银行在投资人与借款人间的交易划转,P2P平台仅能看到信息的流动。标的P2P业内,习惯把借款人发布的投资项目称为“标的”。发标借款人在P2P平台中创建并发布“标的”过程。投标投资人在认可相关借款人之后进行额一种借贷行为,对自己中意的借款标的进行投资操作,一个借款标可由单个投资人或多个投资人承接。满标单笔借款标筹集齐所有借款资金即为满标,计息时间是以标满当日开始计息,投资人较多的平台多数会当天满标。
7.1.2.模块说明
统一账户服务
用户的登录账户、密码、角色、权限、资源等系统级信息的管理,不包含用户业务信息。
用户中心
提供用户业务信息的管理,如会员信息、实名认证信息、绑定银行卡信息等,“用户中心”的每个用户与“统一账户服务”中的账号关联。
交易中心
提供发标、投标等业务。
还款服务
提供还款计划的生成、执行、记录与归档。
银行存管系统(模拟)
模拟银行存管系统,进行资金的存管,划转。

注册账号案例分析

业务流程

采用用户、账号分离设计(这样设计的好处是,当用户的业务信息发生变化时,不会影响的认证、授权等系统机制),因此需要保证用户信息与账号信息的一致性。
分布式事务 - 图28

解决方案分析

针对注册业务,如果用户与账号信息不一致,则会导致严重问题,因此该业务对一致性要求较为严格,即当用户服务和账号服务任意一方出现问题都需要回滚事务。
根据上述需求进行解决方案分析 :
1、采用可靠消息一致性方案
可靠消息一致性要求只要消息发出,事务参与者接到消息就要将事务执行成功,不存在回滚的要求,所以不适用。
2、采用最大努力通知方案
最大努力通知表示发起通知执行完本地事务后将结果通知给事务参与者,即使事务参与者执行业务处理失败发起通知方也不会回滚事务,所以不适用。
3、采用Seata实现2PC
在用户中心发起全局事务,统一账户服务为事务参与者,用户中心和统一账户服务只要有一方出现问题则全局事务回滚,符合要求。
实现方法如下 :
1、用户中心添加用户信息,开启全局事务
2、统一账号服务添加账号信息,作为事务参与者
3、其中一方执行失败Seata对SQL进行逆操作删除用户信息和账号信息,实现回滚。
4、采用Hmily实现TCC
TCC也可以实现用户中心和统一账户服务只要有一方出现问题则全局事务回滚,符合要求。
实现方法如下 :
1、用户中心
try :添加用户,状态为不可用
confirm :更新用户账户为可用
cancel :删除用户
2、统一账号服务
try :添加账号,状态不可用
confirm :更新账号状态为可用
cancel :删除账号

存管开户

业务流程

根据政策要求,P2P业务必须让银行存管资金,用户的资金在银行存管系统的账户中,而不在P2P平台中,因此用户要在银行存管系统开户。
分布式事务 - 图29
用户向用户中心提交开户资源,用户中心生成开户请求号并重定向至银行存管系统开户页面。用户设置存管密码并确认开户后,银行存管立即返回“请求已受理”。在某一个时刻,银行存管系统处理完该开户请求后,将调用回调地址通知处理结果,若通知失败,则按一定策略重试通知。同时,银行存管系统应提供开户结果查询的接口,供用户中心校对结果。

解决方案分析

P2P平台的用户中心与银行存管系统之间属于跨系统交互,银行存管系统属于外部系统,用户中心无法干预银行存管系统,所以用户中心只能在收到银行存管系统的业务处理结果通知后积极处理,开户后的使用情况完全由用户中心开控制。
根据上述需求进行解决方案分析 :
1、采用Seata实现2PC
需要侵入银行存管系统的数据库,由于它的外部系统,所以不适用。
2、采用Hmily实现TCC
TCC侵入性更强,所以不适用。
3、基于MQ的可靠消息一致性
如果让银行存管系统监听MQ则不合适,因为它的外部系统。如果银行存管系统将消息发给MQ用户中心监听MQ是可以的,但是由于相对银行存管系统来说用户中心属于外部系统,银行存管系统是不会让外部系统直接监听自己的MQ的,基于MQ的通信协议也不方便外部系统间的交互。所以本方案不适合。
4、最大努力通知方案
银行存管系统内部使用MQ,银行存管系统处理完业务后将处理结果发给MQ,由银行存管的通知程序专门发送通知,并且采用互联网协议通知给第三方系统(用户中心)。
下图中发起通知即银行存管系统 :
分布式事务 - 图30

满标审核

业务流程

在借款人标的募集够所有的资金后,P2P运营管理员审批该标的,触发放款,并开启还款流程。
分布式事务 - 图31
管理员对某标的满标审批通过,交易中心修改标的状态为“还款中”,同时要通知还款服务生成还款计划。

解决方案分析

生成还款计划是一个执行时长较长的业务,不建议阻塞主业务流程,此业务对一致性要求较低。
根据上述需求进行解决方案分析 :
1、采用Seata实现2PC
Seata在事务执行过程会进行数据库资源锁定,由于事务执行时长较长会将资源锁定较长时间,所以不适用。
2、采用Hmily实现TCC
本需求对业务一致性要求较低,因为生成还款计划的时长较长,所以不要求交易中心修改标的状态为“还款中”就立即生成还款计划,所以本方案不适用。
3、基于MQ的可靠消息一致性
满标批通过后由交易中心修改标的状态为“还款中”并且向还款服务发送消息,还款服务接收到消息开始生成还款计划,基于MQ的可靠消息一致性方案适用此场景。
4、最大努力通知方案
满标审批通过后交易中心向还款服务发送通知要求生成还款计划,还款服务并且对外提供还款计划生成结果校对接口供其他服务查询,最大努力通知也适用本场景。
分布式事务对比分析 :
各种方案的优缺点 :
2PC最大的诟病是阻塞协议。RM在执行分支事务后需要等待TM的决定,此时服务会阻塞并锁定资源。由于其阻塞机制和最差时间复杂度高,因此,这种设计不能适应随着事务涉及的服务数量增加而扩展的需要,很难用于并发较高以及子事务生命周期较长(long-running
transactions)的分布式服务中。
如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突,提高吞吐量成为可能。
而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。典型的使用场景 :满,登录送优惠卷等。
可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作,避免来分布式事务中的同步阻塞操作的影响,并实现来两个服务的解耦。典型的使用场景 :注册送积分,登录送优惠卷等。
最大努力通知是分布式事务中要求最低的一种,适用于一些最终一致性时间敏感度低的业务;允许发起通知方处理业务失败,在接收通知发收到通知后积极进行失败处理,无论发起通知方如何处理结果都不会影响到接收通知方的后续处理;发起通知方需提供查询执行情况接口,用于接收通知方校对结果。典型的使用场景 :银行通知、支付结果通知等。
分布式事务 - 图32

总结

在条件允许的情况下,我们尽可能选择本地事务单数据源,因为它减少来网络交互带来的性能损耗,且避免来数据若一致性带来的种种问题。若某系统频繁不合理的使用分布式事务,应首先从整体设计角度观察服务的拆分是否合理,是否高内聚低耦合?是否粒度太小?分布式事务一直是业界难题,因为网络的不确定性,而且我们习惯于拿分布式事务与单机事务ACID做对比。
无论是数据库曾的XA、还是应用层TCC、可靠消息、最大努力通知等方案,都没有完美解决分布式事务问题,他们不过是各自在性能、一致性、可用性等方面做取舍,寻求某些场景偏好下的权衡。

引用

https://zhuanlan.zhihu.com/p/911181872