分布式全局ID

  • 分库分表系统中,由于id引发的问题

    • 每个表都有唯一标识,通常使用id

    • id通常采用自增的方式

    • 在分库分表情况下,每张表的id都从0开始自增

    • 不同的分片上,id可能重复

    • 导致id在全局不唯一,导致业务上出现问题

    • 两个分片表中存在相同的order_id,导致业务混乱 分布式全局ID、分布式事务学习 - 图1

分布式全局ID-UUID

  • UUID通过唯一识别码(Universally Unique Identifier)

  • 使用UUID,保证每一条记录的id都是不同的

  • 缺点:

    • 只是单纯的一个id,没有实际意义
    • 长度32位,太长
  • MyCat不支持UUID的方式

  • Sharding-Jdbc支持UUID方式

    1. #指定Order表的order_id主键id生成策略为UUID
    2. spring.shardingsphere.sharding.tables.t_order.key-generator.column=order_id
    3. spring.shardingsphere.sharding.tables.t_order.key-generator.type=UUID
    4. #订单表根据order_id自定义分片规则
    5. spring.shardingsphere.sharding.tables.t_order.table-strategy.standard.sharding-column=order_id
    6. #精确分片策略
    7. spring.shardingsphere.sharding.tables.t_order.table-strategy.standard.precise-algorithm-class-name=com.yy.shardingjdbcdemo.sharding.MySharding
  1. public class MyShardingString implements PreciseShardingAlgorithm<String> {
  2. @Override
  3. public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<String> shardingValue) {
  4. String id = shardingValue.getValue();
  5. // orderId的hashcode值 对 节点个数 取模
  6. int mode = id.hashCode() % availableTargetNames.size();
  7. // 分片节点数组
  8. String[] strings = availableTargetNames.toArray(new String[0]);
  9. mode = Math.abs(mode);
  10. System.out.println(strings[0] + "---------" + strings[1]);
  11. System.out.println("mode=" + mode);
  12. return strings[mode];
  13. }
  14. }

分布式全局ID-统一ID序列表

  • ID的值统一的从一个集中的ID序列生成器中获取 分布式全局ID、分布式事务学习 - 图2

  • ID序列生成器 MyCat支持,Sharding-Jdbc不支持

  • MyCat中有两种方式:

    • 本地文件方式
    • 数据库方式
  • 本地文件方式用于测试、数据库方式用于生产

  • 优点:ID集中管理,避免重复

  • 缺点:并发量大时,ID生成器压力较大

分布式全局ID-雪花算法

  • SnowFlake是由Twitter提出的分布式ID算法

  • 一个64 bit的long型的数字 分布式全局ID、分布式事务学习 - 图3

  • 引入了时间戳,保持自增

  • 基本保持全局唯一,毫秒内并发最大4096个ID

  • 时间回调 可能引起ID重复

  • MyCat和Sharding-Jdbc均支持雪花算法

  • Sharding-Jdbc可设置最大容忍回调时间

  • MyCat使用雪花算法

  • Sharding-Jdbc使用雪花算法

分布式全局ID方案落地

  • Java代码定义了一个全局ID生成器

分布式事务

  • 分布式事务问题

    • 分布式系统中,业务拆分成多个数据库

    • 多个独立的数据库之间,无法统一事务

    • 造成数据不一致的情况 分布式全局ID、分布式事务学习 - 图4

  • CAP原理

  • ACID原理与BASE原理

基于XA协议的两阶段提交

  • XA是由X/Open组织提出的分布式事务的规范

  • 由一个事务管理器(TM)和多个资源管理器(RM)组成

  • 提交分为两个阶段:

    • prepare 分布式全局ID、分布式事务学习 - 图5

    • commit 分布式全局ID、分布式事务学习 - 图6

  • 保证数据的强一致性

  • commit阶段出现问题,事务出现不一致,需人工处理

  • 效率低下,性能与本地事务相差10倍

  • MySql5.7及以上版本均支持XA协议

  • MySql Connector/J 5.0以上 支持XA协议

  • Java系统中,数据源采用Atomikos(充当事务管理器)

    • pom依赖
      1. <dependency>
      2. <groupId>org.springframework.boot</groupId>
      3. <artifactId>spring-boot-starter-jta-atomikos</artifactId>
      4. </dependency>
  • 定义RM资源管理器

    • db131 ```java Configuration @MapperScan(value = “com.yy.xademo.db131.dao”, sqlSessionFactoryRef = “sqlSessionFactoryBean131”) public class ConfigDb131 {

    /**

    • 资源XA资源管理器 *
    • @return */ @Bean(“db131”) public DataSource db131() { MysqlXADataSource xaDataSource = new MysqlXADataSource(); xaDataSource.setUser(“root”); xaDataSource.setPassword(“root”); xaDataSource.setUrl(“jdbc:mysql://192.168.73.131:3306/xa_131”);

      AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setXaDataSource(xaDataSource);

      return atomikosDataSourceBean; }

    @Bean(“sqlSessionFactoryBean131”) public SqlSessionFactoryBean sqlSessionFactoryBean(@Qualifier(“db131”) DataSource dataSource) throws IOException { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSource); ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver(); sqlSessionFactoryBean.setMapperLocations(resourceResolver.getResources(“mybatis/db131/*.xml”)); return sqlSessionFactoryBean; }

  1. /**
  2. * 配置XA事务管理器
  3. */
  4. @Bean("xaTransaction")
  5. public JtaTransactionManager jtaTransactionManager() {
  6. UserTransaction userTransaction = new UserTransactionImp();
  7. UserTransactionManager userTransactionManager = new UserTransactionManager();
  8. return new JtaTransactionManager(userTransaction, userTransactionManager);
  9. }

}

  1. -
  2. db132
  3. ```java
  4. @Configuration
  5. @MapperScan(value = "com.yy.xademo.db132.dao",sqlSessionFactoryRef = "sqlSessionFactoryBean132")
  6. public class ConfigDb132 {
  7. @Bean("db132")
  8. public DataSource db132(){
  9. MysqlXADataSource xaDataSource = new MysqlXADataSource();
  10. xaDataSource.setUser("root");
  11. xaDataSource.setPassword("root");
  12. xaDataSource.setUrl("jdbc:mysql://192.168.73.132:3306/xa_132");
  13. AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
  14. atomikosDataSourceBean.setXaDataSource(xaDataSource);
  15. return atomikosDataSourceBean;
  16. }
  17. @Bean("sqlSessionFactoryBean132")
  18. public SqlSessionFactoryBean sqlSessionFactoryBean(@Qualifier("db132") DataSource dataSource) throws IOException {
  19. SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
  20. sqlSessionFactoryBean.setDataSource(dataSource);
  21. ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
  22. sqlSessionFactoryBean.setMapperLocations(resourceResolver.getResources("mybatis/db132/*.xml"));
  23. return sqlSessionFactoryBean;
  24. }
  25. }
  1. -

db131config里配置TM事务管理器

  1. /**
  2. * 配置XA事务管理器
  3. */
  4. @Bean("xaTransaction")
  5. public JtaTransactionManager jtaTransactionManager() {
  6. UserTransaction userTransaction = new UserTransactionImp();
  7. UserTransactionManager userTransactionManager = new UserTransactionManager();
  8. return new JtaTransactionManager(userTransaction, userTransactionManager);
  9. }

MyCat分布式事务

  • vim server.xml

    • handleDistributedTransactions

      • 0:支持分布式事务
      • 1:不支持分布式事务
        1. <!--分布式事务开关,0为不过滤分布式事务,1为过滤分布式事务(如果分布式事务内只涉及全局表,则不过滤),2为不过滤分布式事务,但是记录分布式事务日志-->
        2. <property name="handleDistributedTransactions">0</property>
  • 代码编写

    1. @Service
    2. public class UserService {
    3. @Resource
    4. private UserMapper userMapper;
    5. @Transactional(rollbackFor = Exception.class)
    6. public void testUser() {
    7. User user1 = new User();
    8. user1.setId(1);
    9. user1.setUsername("奇数");
    10. userMapper.insert(user1);
    11. User user2 = new User();
    12. user2.setId(2);
    13. user2.setUsername("偶数111");
    14. userMapper.insert(user2);
    15. }
    16. }

Sharding-Jdbc分布式事务

  • Sharding-Jdbc默认实现分布式事务

TCC事务补偿机制

  • 什么是事务补偿机制?

  • 针对每个操作,都要注册一个与其对应的补偿(撤销)操作

  • 在执行失败时,调用补偿操作,撤销之前的操作

  • A给B转账的例子,A和B在两家不同的银行

  • A账户减200元,B账户加200元

  • 两个操作要保证原子性,要么全成功、要么全失败

  • 由于A和B在两家不同的银行,所以存在分布式事务问题

  • 转账接口需要提供补偿机制

  • 如果A在扣减的过程出现问题,直接抛出异常,事务回滚

  • B在增加余额的过程中,出现问题,要调用A的补偿接口

  • A之前的扣减操作,得到了补偿,进行了撤销 分布式全局ID、分布式事务学习 - 图7

  • 优点:逻辑清晰、流程简单

  • 缺点:数据一致性比XA还要差,可能出错的点比较多

  • TCC属于应用层的一种补偿方式,程序员需要写大量代码

  • 代码示例:

    • db131的数据库配置

      1. @Configuration
      2. @MapperScan(value = "com.yy.tccdemo.db131.dao",sqlSessionFactoryRef = "factoryBean131")
      3. public class ConfigDb131 {
      4. @Bean("db131")
      5. public DataSource db131() {
      6. MysqlDataSource dataSource = new MysqlDataSource();
      7. dataSource.setUser("root");
      8. dataSource.setPassword("root");
      9. dataSource.setUrl("jdbc:mysql://192.168.73.131:3306/xa_131");
      10. return dataSource;
      11. }
      12. @Bean("factoryBean131")
      13. public SqlSessionFactoryBean factoryBean(@Qualifier("db131") DataSource dataSource) throws IOException {
      14. SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
      15. factoryBean.setDataSource(dataSource);
      16. ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
      17. factoryBean.setMapperLocations(resourceResolver.getResources("mybatis/db131/*.xml"));
      18. return factoryBean;
      19. }
      20. @Bean("tm131")
      21. public PlatformTransactionManager transactionManager(@Qualifier("db131") DataSource dataSource) {
      22. return new DataSourceTransactionManager(dataSource);
      23. }
      24. }
  • db132的数据库配置
  1. @Configuration
  2. @MapperScan(value = "com.yy.tccdemo.db132.dao",sqlSessionFactoryRef = "factoryBean132")
  3. public class ConfigDb132 {
  4. @Bean("db132")
  5. public DataSource db132() {
  6. MysqlDataSource dataSource = new MysqlDataSource();
  7. dataSource.setUser("root");
  8. dataSource.setPassword("root");
  9. dataSource.setUrl("jdbc:mysql://192.168.73.132:3306/xa_132");
  10. return dataSource;
  11. }
  12. @Bean("factoryBean132")
  13. public SqlSessionFactoryBean factoryBean(@Qualifier("db132") DataSource dataSource) throws IOException {
  14. SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
  15. factoryBean.setDataSource(dataSource);
  16. ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
  17. factoryBean.setMapperLocations(resourceResolver.getResources("mybatis/db132/*.xml"));
  18. return factoryBean;
  19. }
  20. @Bean("tm132")
  21. public PlatformTransactionManager transactionManager(@Qualifier("db132") DataSource dataSource) {
  22. return new DataSourceTransactionManager(dataSource);
  23. }
  24. }
  • java代码

    1. @Service
    2. public class AccountService {
    3. @Resource
    4. private AccountAMapper accountAMapper;
    5. @Resource
    6. private AccountBMapper accountBMapper;
    7. @Transactional(transactionManager = "tm131",rollbackFor = Exception.class)
    8. public void transferAccount(){
    9. AccountA accountA = accountAMapper.selectByPrimaryKey(1);
    10. accountA.setBalance(accountA.getBalance().subtract(new BigDecimal(200)));
    11. accountAMapper.updateByPrimaryKey(accountA);
    12. AccountB accountB = accountBMapper.selectByPrimaryKey(2);
    13. accountB.setBalance(accountB.getBalance().add(new BigDecimal(200)));
    14. accountBMapper.updateByPrimaryKey(accountB);
    15. try{
    16. int i = 1/0;
    17. }catch (Exception e){
    18. try{
    19. AccountB accountb = accountBMapper.selectByPrimaryKey(2);
    20. accountb.setBalance(accountb.getBalance().subtract(new BigDecimal(200)));
    21. accountBMapper.updateByPrimaryKey(accountb);
    22. }catch (Exception e1){
    23. }
    24. throw e;
    25. }
    26. }
    27. }

基于本地消息表+定时任务的最终一致性方案

  • 采用BASE原理,保证事务最终一致

  • 在一致性方面,允许一段时间内的不一致,但最终会一致

  • 在实际的系统当中,要根据具体情况,判断是否采用

  • 基于本地消息表的方案中,将本事务外操作,记录在消息表中

  • 其他事务,提供操作接口

  • 定时任务轮询本地消息表,将未执行的消息发送给操作接口

  • 操作接口处理成功,返回成功标识,处理失败返回失败标识

  • 定时任务接到标识,更新消息的状态

  • 定时任务按照一定的周期反复执行

  • 对于屡次失败的消息,可以设置最大失败次数

  • 超过最大失败次数的消息,不再进行接口调用

  • 等待人工处理 分布式全局ID、分布式事务学习 - 图8

  • 优点:避免了分布式事务,实现了最终一致性

  • 缺点:要注意重试时的幂等性操作

  • 本地消息表-数据库设计 分布式全局ID、分布式事务学习 - 图9

  • 本地消息表-接口设计

    1. /**
    2. * 支付接口
    3. * @param userId
    4. * @param orderId
    5. * @param amount
    6. * @return 0:成功;1:用户不存在;2:余额不足
    7. */
    8. @Transactional(transactionManager = "tm131")
    9. public int pament(int userId, int orderId, BigDecimal amount){
    10. //支付操作
    11. AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
    12. if (accountA == null) return 1;
    13. if (accountA.getBalance().compareTo(amount) < 0) return 2;
    14. accountA.setBalance(accountA.getBalance().subtract(amount));
    15. accountAMapper.updateByPrimaryKey(accountA);
    16. PaymentMsg paymentMsg = new PaymentMsg();
    17. paymentMsg.setOrderId(orderId);
    18. paymentMsg.setStatus(0);//未发送
    19. paymentMsg.setFalureCnt(0);//失败次数
    20. paymentMsg.setCreateTime(new Date());
    21. paymentMsg.setCreateUser(userId);
    22. paymentMsg.setUpdateTime(new Date());
    23. paymentMsg.setUpdateUser(userId);
    24. paymentMsgMapper.insertSelective(paymentMsg);
    25. return 0;
    26. }
  • 本地消息表-订单操作接口

    1. /**
    2. * 订单回调接口
    3. *
    4. * @param orderId
    5. * @return 0:成功 1:订单不存在
    6. */
    7. public int handleOrder(int orderId) {
    8. Order order = orderMapper.selectByPrimaryKey(orderId);
    9. if (order == null) return 1;
    10. order.setOrderStatus(1);//已支付
    11. order.setUpdateTime(new Date());
    12. order.setUpdateUser(0);//系统更新
    13. orderMapper.updateByPrimaryKey(order);
    14. return 0;
    15. }
  • 本地消息表-定时任务 ```java @Service public class OrderScheduler { @Resource private PaymentMsgMapper paymentMsgMapper;

    @Scheduled(cron = “0/10 ?”) public void orderNotify() throws IOException {

    1. PaymentMsgExample paymentMsgExample = new PaymentMsgExample();
    2. paymentMsgExample.createCriteria().andStatusEqualTo(0);//未发送
    3. List<PaymentMsg> paymentMsgs = paymentMsgMapper.selectByExample(paymentMsgExample);
    4. if (paymentMsgs==null || paymentMsgs.size() ==0) return;
    5. for (PaymentMsg paymentMsg : paymentMsgs) {
    6. int order = paymentMsg.getOrderId();
    7. CloseableHttpClient httpClient = HttpClientBuilder.create().build();
    8. HttpPost httpPost = new HttpPost("http://localhost:8080/handleOrder");
    9. NameValuePair orderIdPair = new BasicNameValuePair("orderId",order+"");
    10. List<NameValuePair> list = new ArrayList<>();
    11. list.add(orderIdPair);
    12. HttpEntity httpEntity = new UrlEncodedFormEntity(list);
    13. httpPost.setEntity(httpEntity);
  1. CloseableHttpResponse response = httpClient.execute(httpPost);
  2. String s = EntityUtils.toString(response.getEntity());
  3. if ("success".equals(s)){
  4. paymentMsg.setStatus(1);//发送成功
  5. paymentMsg.setUpdateTime(new Date());
  6. paymentMsg.setUpdateUser(0);//系统更新
  7. paymentMsgMapper.updateByPrimaryKey(paymentMsg);
  8. }else {
  9. Integer falureCnt = paymentMsg.getFalureCnt();
  10. falureCnt++;
  11. paymentMsg.setFalureCnt(falureCnt);
  12. if (falureCnt > 5){
  13. paymentMsg.setStatus(2);//失败
  14. }
  15. paymentMsg.setUpdateTime(new Date());
  16. paymentMsg.setUpdateUser(0);//系统更新
  17. paymentMsgMapper.updateByPrimaryKey(paymentMsg);
  18. }
  19. }
  20. }

}

  1. <a name="44c54314"></a>
  2. # 基于MQ消息队列的最终一致性方案
  3. -
  4. 原理、流程与本地消息表类似
  5. ![](/Users/liuyuyan/Library/Application Support/typora-user-images/image-20200502172730923.png#alt=image-20200502172730923)
  6. -
  7. 不同点:
  8. - 本地消息改为MQ
  9. - 定时任务改为MQ的消费者
  10. -
  11. 优点:不依赖定时任务,基于MQ更高效、更可靠
  12. -
  13. 适合于公司内的系统
  14. -
  15. 不同公司之间无法基于MQ,本地消息表更合适
  16. -
  17. Rocketmq下载安装
  18. -
  19. 引入pom依赖
  20. ```xml
  21. <dependency>
  22. <groupId>org.apache.rocketmq</groupId>
  23. <artifactId>rocketmq-client</artifactId>
  24. <version>4.5.2</version>
  25. </dependency>
  • 定义生产者
    1. @Bean(initMethod = "start",destroyMethod = "shutdown")
    2. public DefaultMQProducer producer() {
    3. DefaultMQProducer producer = new
    4. DefaultMQProducer("paymentGroup");
    5. // Specify name server addresses.
    6. producer.setNamesrvAddr("localhost:9876");
    7. return producer;
    8. }
  • 定义消费者

    1. @Bean(initMethod = "start",destroyMethod = "shutdown")
    2. public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {
    3. DefaultMQPushConsumer consumer = new
    4. DefaultMQPushConsumer("paymentConsumerGroup");
    5. // Specify name server addresses.
    6. consumer.setNamesrvAddr("localhost:9876");
    7. // Subscribe one more more topics to consume.
    8. consumer.subscribe("payment", "*");
    9. consumer.registerMessageListener(messageListener);
    10. return consumer;
    11. }
  • 支付接口-消息队列

    1. /**
    2. * 支付接口(消息队列)
    3. * @param userId
    4. * @param orderId
    5. * @param amount
    6. * @return 0:成功;1:用户不存在;2:余额不足
    7. */
    8. @Transactional(transactionManager = "tm131",rollbackFor = Exception.class)
    9. public int pamentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
    10. //支付操作
    11. AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
    12. if (accountA == null) return 1;
    13. if (accountA.getBalance().compareTo(amount) < 0) return 2;
    14. accountA.setBalance(accountA.getBalance().subtract(amount));
    15. accountAMapper.updateByPrimaryKey(accountA);
    16. Message message = new Message();
    17. message.setTopic("payment");
    18. message.setKeys(orderId+"");
    19. message.setBody("订单已支付".getBytes());
    20. try {
    21. SendResult result = producer.send(message);
    22. if (result.getSendStatus() == SendStatus.SEND_OK){
    23. return 0;
    24. }else {
    25. throw new Exception("消息发送失败!");
    26. }
    27. } catch (Exception e) {
    28. e.printStackTrace();
    29. throw e;
    30. }
    31. }
  • 消费者消费

    1. Component("messageListener")
    2. public class ChangeOrderStatus implements MessageListenerConcurrently {
    3. @Resource
    4. private OrderMapper orderMapper;
    5. @Override
    6. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
    7. ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    8. if (list == null || list.size()==0) return CONSUME_SUCCESS;
    9. for (MessageExt messageExt : list) {
    10. String orderId = messageExt.getKeys();
    11. String msg = new String(messageExt.getBody());
    12. System.out.println("msg="+msg);
    13. Order order = orderMapper.selectByPrimaryKey(Integer.parseInt(orderId));
    14. if (order==null) return RECONSUME_LATER;
    15. try {
    16. order.setOrderStatus(1);//已支付
    17. order.setUpdateTime(new Date());
    18. order.setUpdateUser(0);//系统更新
    19. orderMapper.updateByPrimaryKey(order);
    20. }catch (Exception e){
    21. e.printStackTrace();
    22. return RECONSUME_LATER;
    23. }
    24. }
    25. return CONSUME_SUCCESS;
    26. }
    27. }

分布式事务技术落地

  • 创建订单减库存,直接使用MyCat分布式事务