写在前面

上一篇文章出现的问题,就是sharing-jdbc无法根据一个不固定的字段(停车场id)进行动态分表,因为actual-data-nodes是在项目启动的时候就加载好的,不支持动态修改。
还好这一切都是可以解决的。
那么这一篇文章,就是解决了actual-data-nodes动态修改问题。解决方案大致说明一下就是基于sharding-jdbc + sharding的服务编排治理+redis,实现了订单表根据停车场id动态分表,每增删停车场,在不重启项目的情况下动态的改变actual-data-nodes

思路

首先参考了这篇老哥的博文:https://blog.csdn.net/qq_32588349/article/details/99440985 给了我很大的启发
根据官方文档描述,shardingsphere提供了配置中心、注册中心的服务治理功能。并且有这句描述:
sharding-jdbc 如何实时修改分表规则(不重启项目) - 图1
这应该就是我想要的东西,但是我确实不知道该怎么入手,就让公司的架构师给我处理了一下。然后最终得到了下面这个方案。在这里要感谢两位老哥!

大概的描述一下本demo的业务:

  1. sharing-jdbc : 引入基于java的配置包(不用starter包)
  2. 需要分的表为:订单表(t_order)
  3. 分表的依据字段:停车场id(car_park_id)
  4. 分库字段:不需要分库
  5. 初始化数据库要有一个默认表:t_order_defalut,这个表只是为了第一次启动项目,还没有停车场信息的时候,用来默认的,里面不会存任何数据
  6. redis hash存放<停车场id,停车场名称>
  7. redis zset存放 订单actual-data-nodes,score为当前时间的时间戳,方便获取最新的用来替换

配置步骤

sql脚本

  1. /*
  2. Navicat MySQL Data Transfer
  3. Source Server : 开发数据库 4.71
  4. Source Server Version : 50730
  5. Source Host : 192.168.4.71:3307
  6. Source Database : sharding_carpark
  7. Target Server Type : MYSQL
  8. Target Server Version : 50730
  9. File Encoding : 65001
  10. Date: 2020-05-11 18:25:00
  11. */
  12. SET FOREIGN_KEY_CHECKS=0;
  13. -- ----------------------------
  14. -- Table structure for t_car_park
  15. -- ----------------------------
  16. DROP TABLE IF EXISTS `t_car_park`;
  17. CREATE TABLE `t_car_park` (
  18. `id` varchar(64) NOT NULL,
  19. `name` varchar(100) DEFAULT NULL COMMENT '名称',
  20. `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  21. PRIMARY KEY (`id`)
  22. ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='停车场表';
  23. -- ----------------------------
  24. -- Table structure for t_order_default
  25. -- ----------------------------
  26. DROP TABLE IF EXISTS `t_order_default`;
  27. CREATE TABLE `t_order_default` (
  28. `id` varchar(64) NOT NULL,
  29. `name` varchar(100) DEFAULT NULL COMMENT '名称',
  30. `car_park_id` varchar(64) DEFAULT NULL COMMENT '停车场id',
  31. `no` varchar(100) DEFAULT NULL COMMENT '订单号',
  32. `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  33. PRIMARY KEY (`id`)
  34. ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='测试分表';

pom文件

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-test</artifactId>
  8. <scope>test</scope>
  9. <exclusions>
  10. <exclusion>
  11. <groupId>org.junit.vintage</groupId>
  12. <artifactId>junit-vintage-engine</artifactId>
  13. </exclusion>
  14. </exclusions>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.springframework.boot</groupId>
  18. <artifactId>spring-boot-starter-data-redis</artifactId>
  19. </dependency>
  20. <!--mybatisplus-->
  21. <dependency>
  22. <groupId>com.baomidou</groupId>
  23. <artifactId>mybatis-plus-boot-starter</artifactId>
  24. <version>3.3.1.tmp</version>
  25. </dependency>
  26. <!--lombok-->
  27. <dependency>
  28. <groupId>org.projectlombok</groupId>
  29. <artifactId>lombok</artifactId>
  30. <scope>provided</scope>
  31. </dependency>
  32. <!--fastjson-->
  33. <dependency>
  34. <groupId>com.alibaba</groupId>
  35. <artifactId>fastjson</artifactId>
  36. <version>1.2.60</version>
  37. </dependency>
  38. <!--mysql-->
  39. <dependency>
  40. <groupId>mysql</groupId>
  41. <artifactId>mysql-connector-java</artifactId>
  42. <version>5.1.44</version>
  43. </dependency>
  44. <!--druid-->
  45. <dependency>
  46. <groupId>com.alibaba</groupId>
  47. <artifactId>druid</artifactId>
  48. <version>1.1.20</version>
  49. </dependency>
  50. <dependency>
  51. <groupId>org.springframework.boot</groupId>
  52. <artifactId>spring-boot-starter-log4j2</artifactId>
  53. </dependency>
  54. <dependency>
  55. <groupId>org.apache.shardingsphere</groupId>
  56. <artifactId>sharding-jdbc-core</artifactId>
  57. <version>4.0.0</version>
  58. </dependency>
  59. <dependency>
  60. <groupId>org.apache.shardingsphere</groupId>
  61. <artifactId>sharding-jdbc-orchestration</artifactId>
  62. <version>4.0.0</version>
  63. </dependency>

重点关注:sharding-jdbc-core、sharding-jdbc-orchestration这两个包,这两个包是必备的。

application.yml

  1. server:
  2. port: 8086
  3. tomcat:
  4. max-threads: 100
  5. spring:
  6. druid:
  7. datasource:
  8. type: com.alibaba.druid.pool.DruidDataSource
  9. url: jdbc:mysql://192.168.4.71:3307/sharding_carPark?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
  10. driver-class-name: com.mysql.jdbc.Driver
  11. username: root
  12. password: 123456
  13. maxActive: 20
  14. initialSize: 5
  15. maxWait: 60000
  16. minIdle: 5
  17. timeBetweenEvictionRunsMillis: 60000
  18. minEvictableIdleTimeMillis: 300000
  19. validationQuery: SELECT 1 FROM DUAL
  20. testWhileIdle: true
  21. testOnBorrow: false
  22. testOnReturn: false
  23. #是否缓存preparedStatement,也就是PSCache。在mysql下建议关闭。 PSCache对支持游标的数据库性能提升巨大,比如说oracle。
  24. poolPreparedStatements: false
  25. #要启用PSCache,-1为关闭 必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true 可以把这个数值配置大一些,比如说100
  26. maxOpenPreparedStatements: -1
  27. #配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
  28. filters: stat,wall,log4j2
  29. #通过connectProperties属性来打开mergeSql功能;慢SQL记录
  30. connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
  31. #合并多个DruidDataSource的监控数据
  32. useGlobalDataSourceStat: true
  33. loginUsername: druid
  34. loginPassword: druid
  35. redis:
  36. database: 1
  37. host: 192.168.4.71
  38. port: 6379
  39. password: 123456
  40. jedis:
  41. pool:
  42. max-active: 8
  43. max-wait: -1
  44. max-idle: 8
  45. min-idle: 0
  46. logging:
  47. level:
  48. com.example.demo: debug

这里的配置文件比较常规

LocalRegistryCenter 本地注册中心

  1. package com.example.demo.config.shardingconfig;
  2. import org.apache.shardingsphere.orchestration.reg.api.RegistryCenter;
  3. import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;
  4. import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEventListener;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.Properties;
  8. import java.util.concurrent.ConcurrentHashMap;
  9. public class LocalRegistryCenter implements RegistryCenter {
  10. public static Map<String, DataChangedEventListener> listeners = new ConcurrentHashMap<>();
  11. private RegistryCenterConfiguration config;
  12. private Properties properties;
  13. /**
  14. * public 是为了在重置节点的时候减少去重新读配置
  15. */
  16. public static Map<String, String> values = new ConcurrentHashMap<>();
  17. @Override
  18. public void init(RegistryCenterConfiguration config) {
  19. this.config = config;
  20. }
  21. @Override
  22. public String get(String key) {
  23. return values.get(key);
  24. }
  25. @Override
  26. public String getDirectly(String key) {
  27. return values.get(key);
  28. }
  29. @Override
  30. public boolean isExisted(String key) {
  31. return values.containsKey(key);
  32. }
  33. @Override
  34. public List<String> getChildrenKeys(String key) {
  35. return null;
  36. }
  37. @Override
  38. public void persist(String key, String value) {
  39. values.put(key, value);
  40. }
  41. @Override
  42. public void update(String key, String value) {
  43. values.put(key, value);
  44. }
  45. @Override
  46. public void persistEphemeral(String key, String value) {
  47. values.put(key, value);
  48. }
  49. @Override
  50. public void watch(String key, DataChangedEventListener dataChangedEventListener) {
  51. if (null != dataChangedEventListener) {
  52. // 将数据改变的事件监听器缓存下来
  53. listeners.put(key, dataChangedEventListener);
  54. }
  55. }
  56. @Override
  57. public void close() {
  58. config = null;
  59. }
  60. @Override
  61. public void initLock(String key) {
  62. }
  63. @Override
  64. public boolean tryLock() {
  65. return false;
  66. }
  67. @Override
  68. public void tryRelease() {
  69. }
  70. @Override
  71. public String getType() {
  72. // 【关键点1】,留着文章后续引用
  73. return "shardingLocalRegisterCenter";
  74. }
  75. @Override
  76. public Properties getProperties() {
  77. return properties;
  78. }
  79. @Override
  80. public void setProperties(Properties properties) {
  81. this.properties = properties;
  82. }
  83. }

下面这步很重要
在本地文件中添加注册中心

  1. 在resources文件夹下面新建Directory,名称为:META-INF
  2. 在META-INF继续创建名为service的Directory
  3. 添加file,名为org.apache.shardingsphere.orchestration.reg.api.RegistryCenter(注意这个是固定的),里面的内容是:com.example.demo.config.shardingconfig.LocalRegistryCenter(你本地注册中心类存放的全路径)
    像这样
    sharding-jdbc 如何实时修改分表规则(不重启项目) - 图2

基于java的配置类

  1. package com.example.demo.config.shardingconfig;
  2. import com.alibaba.druid.filter.Filter;
  3. import com.alibaba.druid.filter.logging.Slf4jLogFilter;
  4. import com.alibaba.druid.filter.stat.StatFilter;
  5. import com.alibaba.druid.pool.DruidDataSource;
  6. import com.alibaba.druid.util.StringUtils;
  7. import com.alibaba.druid.wall.WallConfig;
  8. import com.alibaba.druid.wall.WallFilter;
  9. import com.example.demo.config.datasource.DataSourceProperties;
  10. import com.example.demo.config.redis.RedisConfig;
  11. import com.example.demo.config.redis.RedisPrefixEnum;
  12. import org.apache.shardingsphere.api.config.sharding.ShardingRuleConfiguration;
  13. import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration;
  14. import org.apache.shardingsphere.api.config.sharding.strategy.StandardShardingStrategyConfiguration;
  15. import org.apache.shardingsphere.orchestration.config.OrchestrationConfiguration;
  16. import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;
  17. import org.apache.shardingsphere.shardingjdbc.orchestration.api.OrchestrationShardingDataSourceFactory;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.boot.autoconfigure.AutoConfigureAfter;
  20. import org.springframework.context.annotation.Bean;
  21. import org.springframework.context.annotation.Configuration;
  22. import org.springframework.data.redis.core.RedisTemplate;
  23. import javax.sql.DataSource;
  24. import java.sql.SQLException;
  25. import java.util.*;
  26. /**
  27. * @title: ShardingRuleConfig
  28. * @projectName shardingJavaDemo
  29. * @description: TODO
  30. * @author zhy
  31. * @date 2020/5/910:23
  32. */
  33. @Configuration
  34. @AutoConfigureAfter({DataSourceProperties.class, RedisConfig.class})
  35. public class ShardingRuleConfig {
  36. private String defaultDataSource = DatasourceEnum.DEFAULT.getValue();
  37. @Autowired
  38. private RedisTemplate<String,Object> redisTemplate;
  39. @Autowired
  40. private DataSourceProperties properties;
  41. /**
  42. * shardingjdbc数据源
  43. * @param
  44. * @throws
  45. * @return javax.sql.DataSource
  46. * @author zhy
  47. * @date 2020/5/9 10:33
  48. */
  49. @Bean
  50. public DataSource dataSource() throws SQLException {
  51. // 配置真实数据源
  52. Map<String, DataSource> dataSourceMap = new HashMap<>();
  53. //多数据源配置
  54. //数据源1
  55. DruidDataSource dataSource0 = druidDataSource();
  56. dataSourceMap.put(defaultDataSource, dataSource0);
  57. //数据源2
  58. // DruidDataSource dataSource1 = createDb1();
  59. // dataSourceMap.put("ds1", dataSource1);
  60. // 配置分片规则
  61. ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
  62. //订单表分片规则
  63. TableRuleConfiguration orderRuleConfig = orderRuleConfig();
  64. shardingRuleConfig.getTableRuleConfigs().add(orderRuleConfig);
  65. //可以继续用add添加分片规则
  66. //shardingRuleConfig.getTableRuleConfigs().add(orderRuleConfig);
  67. //多数据源一定要指定默认数据源,只有一个数据源就不需要
  68. //shardingRuleConfig.setDefaultDataSourceName("ds0");
  69. Properties p = new Properties();
  70. //打印sql语句,生产环境关闭
  71. p.setProperty("sql.show",Boolean.TRUE.toString());
  72. OrchestrationConfiguration orchestrationConfig = new OrchestrationConfiguration(
  73. "orchestration-sharding-data-source", new RegistryCenterConfiguration("shardingLocalRegisterCenter"),
  74. false);
  75. return OrchestrationShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig, p,
  76. orchestrationConfig);
  77. }
  78. /**
  79. * 订单分片规则
  80. * @param
  81. * @throws
  82. * @return io.shardingjdbc.core.api.config.TableRuleConfiguration
  83. * @author zhy
  84. * @date 2020/5/7 10:28
  85. */
  86. private TableRuleConfiguration orderRuleConfig(){
  87. String logicTable = ShardingTableEnum.ORDER.getValue();
  88. String orderNodesByRedisCarPark = getActualDataNodesByCatalog(ShardingTableEnum.ORDER);
  89. //t_order_default 这张表是默认表,需要事先建好,防止首次启动报错
  90. String actualDataNodes = StringUtils.isEmpty(orderNodesByRedisCarPark) ? "ds0.t_order_default" : orderNodesByRedisCarPark;
  91. TableRuleConfiguration tableRuleConfig = new TableRuleConfiguration(logicTable,actualDataNodes);
  92. //设置分表策略
  93. tableRuleConfig.setTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("car_park_id",new CarParkShardingTableAlgorithm()));
  94. //根据时间将策略放进redis中,方便读取替换
  95. redisTemplate.opsForZSet().add(RedisPrefixEnum.SHARDING_RULE_ORDER.getValue(),actualDataNodes,new Date().getTime());
  96. return tableRuleConfig;
  97. }
  98. /**
  99. * 根据分表类型获取初始化actualDataNodes
  100. * @param logicTable
  101. * @throws
  102. * @return java.lang.String
  103. * @author zhy
  104. * @date 2020/5/11 14:52
  105. */
  106. public String getActualDataNodesByCatalog(ShardingTableEnum logicTable){
  107. String redisKey = RedisPrefixEnum.CAR_PARK_ID_CATALOG.getValue();
  108. //获取所有的停车场
  109. Set<Object> keys = redisTemplate.opsForHash().keys(redisKey);
  110. if (keys.isEmpty()){
  111. return null;
  112. }
  113. StringBuilder sb = new StringBuilder();
  114. keys.forEach(obj -> {
  115. sb.append(defaultDataSource).append(".").append(logicTable.getValue()).append("_").append(obj.toString()).append(",");
  116. });
  117. sb.deleteCharAt(sb.length() - 1);
  118. return sb.toString();
  119. }
  120. /**
  121. * 获取druid数据库链接
  122. * @param
  123. * @throws
  124. * @return com.alibaba.druid.pool.DruidDataSource
  125. * @author zhy
  126. * @date 2020/5/7 10:29
  127. */
  128. private DruidDataSource druidDataSource() {
  129. DruidDataSource dataSource = new DruidDataSource();
  130. dataSource.setDriverClassName(properties.getDriverClassName());
  131. dataSource.setUrl(properties.getUrl());
  132. dataSource.setUsername(properties.getUsername());
  133. dataSource.setPassword(properties.getPassword());
  134. dataSource.setInitialSize(properties.getInitialSize());
  135. dataSource.setMinIdle(properties.getMinIdle());
  136. dataSource.setMaxActive(properties.getMaxActive());
  137. dataSource.setMaxWait(properties.getMaxWait());
  138. dataSource.setTimeBetweenEvictionRunsMillis(properties.getTimeBetweenEvictionRunsMillis());
  139. dataSource.setMinEvictableIdleTimeMillis(properties.getMinEvictableIdleTimeMillis());
  140. String validationQuery = properties.getValidationQuery();
  141. if (validationQuery != null && !"".equals(validationQuery)) {
  142. dataSource.setValidationQuery(validationQuery);
  143. }
  144. dataSource.setTestWhileIdle(properties.isTestWhileIdle());
  145. dataSource.setTestOnBorrow(properties.isTestOnBorrow());
  146. dataSource.setTestOnReturn(properties.isTestOnReturn());
  147. if (properties.isPoolPreparedStatements()) {
  148. dataSource.setMaxPoolPreparedStatementPerConnectionSize(properties.getMaxPoolPreparedStatementPerConnectionSize());
  149. }
  150. String connectionPropertiesStr = properties.getConnectionProperties();
  151. if (connectionPropertiesStr != null && !"".equals(connectionPropertiesStr)) {
  152. Properties connectProperties = new Properties();
  153. String[] propertiesList = connectionPropertiesStr.split(";");
  154. for (String propertiesTmp : propertiesList) {
  155. String[] obj = propertiesTmp.split("=");
  156. String key = obj[0];
  157. String value = obj[1];
  158. connectProperties.put(key, value);
  159. }
  160. dataSource.setConnectProperties(connectProperties);
  161. }
  162. dataSource.setUseGlobalDataSourceStat(properties.isUseGlobalDataSourceStat());
  163. WallConfig wallConfig = new WallConfig();
  164. wallConfig.setMultiStatementAllow(true);
  165. WallFilter wallFilter = new WallFilter();
  166. wallFilter.setConfig(wallConfig);
  167. //打开日志记录过滤器,可通过log4j2,记录sql application.yml中配置【logging:config: classpath:logConfig/log4j2.xml】
  168. Slf4jLogFilter slf4jLogFilter = new Slf4jLogFilter();
  169. slf4jLogFilter.setStatementCreateAfterLogEnabled(false);
  170. slf4jLogFilter.setStatementCloseAfterLogEnabled(false);
  171. slf4jLogFilter.setResultSetOpenAfterLogEnabled(false);
  172. slf4jLogFilter.setResultSetCloseAfterLogEnabled(false);
  173. List<Filter> filters = new ArrayList<>();
  174. filters.add(wallFilter);
  175. filters.add(new StatFilter());
  176. filters.add(slf4jLogFilter);
  177. dataSource.setProxyFilters(filters);
  178. return dataSource;
  179. }
  180. }

动态替换的shardingService

  1. package com.example.demo.config.shardingconfig;
  2. import com.example.demo.config.datasource.DataSourceProperties;
  3. import com.example.demo.config.redis.RedisConfig;
  4. import com.example.demo.config.redis.RedisPrefixEnum;
  5. import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEvent;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.autoconfigure.AutoConfigureAfter;
  8. import org.springframework.data.redis.core.RedisTemplate;
  9. import org.springframework.stereotype.Component;
  10. import java.util.ArrayList;
  11. import java.util.Set;
  12. /**
  13. * @title: ShardingService
  14. * @projectName shardingJavaDemo
  15. * @description: TODO
  16. * @author zhy
  17. * @date 2020/5/1115:08
  18. */
  19. @Component
  20. @AutoConfigureAfter({RedisConfig.class})
  21. public class ShardingService {
  22. @Autowired
  23. private RedisTemplate<String,Object> redisTemplate;
  24. @Autowired
  25. private ShardingRuleConfig shardingRuleConfig;
  26. /**
  27. * 替换sharding里的分表规则ActualDataNodes的值
  28. * @param oldRule
  29. * @param newRule
  30. * @throws
  31. * @return void
  32. * @author zhy
  33. * @date 2020/5/11 15:12
  34. */
  35. public void replaceActualDataNodes(String oldRule,String newRule){
  36. // 获取已有的配置
  37. String rules = LocalRegistryCenter.values
  38. .get("/orchestration-sharding-data-source/config/schema/logic_db/rule");
  39. // 修改规则
  40. String rule = rules.replace(oldRule, newRule);
  41. LocalRegistryCenter.listeners.get("/orchestration-sharding-data-source/config/schema")
  42. .onChange(new DataChangedEvent(
  43. "/orchestration-sharding-data-source/config/schema/logic_db/rule",
  44. rule, DataChangedEvent.ChangedType.UPDATED));
  45. LocalRegistryCenter.values.put("/orchestration-sharding-data-source/config/schema/logic_db/rule",rule);
  46. }
  47. /**
  48. * 获取当前的分表规则
  49. * @param shardingTableEnum
  50. * @throws
  51. * @return java.lang.String
  52. * @author zhy
  53. * @date 2020/5/11 15:56
  54. */
  55. public String getActualDataNodesInRedis(ShardingTableEnum shardingTableEnum){
  56. String redisKey = RedisPrefixEnum.SHARDING_RULE_ORDER.getValue();
  57. //倒序获取一条最新的纪录
  58. Set<Object> objects = redisTemplate.opsForZSet().reverseRange(redisKey, 0, 1);
  59. return new ArrayList<>(objects).get(0).toString();
  60. }
  61. /**
  62. * 根据redis中存储的停车场id获取分表规则
  63. * @param shardingTableEnum
  64. * @throws
  65. * @return java.lang.String
  66. * @author zhy
  67. * @date 2020/5/11 16:09
  68. */
  69. public String getActualDataNodesByCatalog(ShardingTableEnum shardingTableEnum){
  70. return shardingRuleConfig.getActualDataNodesByCatalog(shardingTableEnum);
  71. }
  72. }

最后贴上源码

https://github.com/zhyhuayong/shardingJavaDemo
大家可以参考源码,要测试的同学记得修改自己的数据库配置哦