写在前面
上一篇文章出现的问题,就是sharing-jdbc无法根据一个不固定的字段(停车场id)进行动态分表,因为actual-data-nodes是在项目启动的时候就加载好的,不支持动态修改。
还好这一切都是可以解决的。
那么这一篇文章,就是解决了actual-data-nodes动态修改问题。解决方案大致说明一下就是基于sharding-jdbc + sharding的服务编排治理+redis,实现了订单表根据停车场id动态分表,每增删停车场,在不重启项目的情况下动态的改变actual-data-nodes
思路
首先参考了这篇老哥的博文:https://blog.csdn.net/qq_32588349/article/details/99440985 给了我很大的启发
根据官方文档描述,shardingsphere提供了配置中心、注册中心的服务治理功能。并且有这句描述:
这应该就是我想要的东西,但是我确实不知道该怎么入手,就让公司的架构师给我处理了一下。然后最终得到了下面这个方案。在这里要感谢两位老哥!
大概的描述一下本demo的业务:
- sharing-jdbc : 引入基于java的配置包(不用starter包)
- 需要分的表为:订单表(t_order)
- 分表的依据字段:停车场id(car_park_id)
- 分库字段:不需要分库
- 初始化数据库要有一个默认表:t_order_defalut,这个表只是为了第一次启动项目,还没有停车场信息的时候,用来默认的,里面不会存任何数据
- redis hash存放<停车场id,停车场名称>
- redis zset存放 订单actual-data-nodes,score为当前时间的时间戳,方便获取最新的用来替换
配置步骤
sql脚本
/*Navicat MySQL Data TransferSource Server : 开发数据库 4.71Source Server Version : 50730Source Host : 192.168.4.71:3307Source Database : sharding_carparkTarget Server Type : MYSQLTarget Server Version : 50730File Encoding : 65001Date: 2020-05-11 18:25:00*/SET FOREIGN_KEY_CHECKS=0;-- ------------------------------ Table structure for t_car_park-- ----------------------------DROP TABLE IF EXISTS `t_car_park`;CREATE TABLE `t_car_park` (`id` varchar(64) NOT NULL,`name` varchar(100) DEFAULT NULL COMMENT '名称',`create_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='停车场表';-- ------------------------------ Table structure for t_order_default-- ----------------------------DROP TABLE IF EXISTS `t_order_default`;CREATE TABLE `t_order_default` (`id` varchar(64) NOT NULL,`name` varchar(100) DEFAULT NULL COMMENT '名称',`car_park_id` varchar(64) DEFAULT NULL COMMENT '停车场id',`no` varchar(100) DEFAULT NULL COMMENT '订单号',`create_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='测试分表';
pom文件
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!--mybatisplus--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.3.1.tmp</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope></dependency><!--fastjson--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.60</version></dependency><!--mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version></dependency><!--druid--><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.20</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-core</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-orchestration</artifactId><version>4.0.0</version></dependency>
重点关注:sharding-jdbc-core、sharding-jdbc-orchestration这两个包,这两个包是必备的。
application.yml
server:port: 8086tomcat:max-threads: 100spring:druid:datasource:type: com.alibaba.druid.pool.DruidDataSourceurl: jdbc:mysql://192.168.4.71:3307/sharding_carPark?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghaidriver-class-name: com.mysql.jdbc.Driverusername: rootpassword: 123456maxActive: 20initialSize: 5maxWait: 60000minIdle: 5timeBetweenEvictionRunsMillis: 60000minEvictableIdleTimeMillis: 300000validationQuery: SELECT 1 FROM DUALtestWhileIdle: truetestOnBorrow: falsetestOnReturn: false#是否缓存preparedStatement,也就是PSCache。在mysql下建议关闭。 PSCache对支持游标的数据库性能提升巨大,比如说oracle。poolPreparedStatements: false#要启用PSCache,-1为关闭 必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true 可以把这个数值配置大一些,比如说100maxOpenPreparedStatements: -1#配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙filters: stat,wall,log4j2#通过connectProperties属性来打开mergeSql功能;慢SQL记录connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000#合并多个DruidDataSource的监控数据useGlobalDataSourceStat: trueloginUsername: druidloginPassword: druidredis:database: 1host: 192.168.4.71port: 6379password: 123456jedis:pool:max-active: 8max-wait: -1max-idle: 8min-idle: 0logging:level:com.example.demo: debug
这里的配置文件比较常规
LocalRegistryCenter 本地注册中心
package com.example.demo.config.shardingconfig;import org.apache.shardingsphere.orchestration.reg.api.RegistryCenter;import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEventListener;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ConcurrentHashMap;public class LocalRegistryCenter implements RegistryCenter {public static Map<String, DataChangedEventListener> listeners = new ConcurrentHashMap<>();private RegistryCenterConfiguration config;private Properties properties;/*** public 是为了在重置节点的时候减少去重新读配置*/public static Map<String, String> values = new ConcurrentHashMap<>();@Overridepublic void init(RegistryCenterConfiguration config) {this.config = config;}@Overridepublic String get(String key) {return values.get(key);}@Overridepublic String getDirectly(String key) {return values.get(key);}@Overridepublic boolean isExisted(String key) {return values.containsKey(key);}@Overridepublic List<String> getChildrenKeys(String key) {return null;}@Overridepublic void persist(String key, String value) {values.put(key, value);}@Overridepublic void update(String key, String value) {values.put(key, value);}@Overridepublic void persistEphemeral(String key, String value) {values.put(key, value);}@Overridepublic void watch(String key, DataChangedEventListener dataChangedEventListener) {if (null != dataChangedEventListener) {// 将数据改变的事件监听器缓存下来listeners.put(key, dataChangedEventListener);}}@Overridepublic void close() {config = null;}@Overridepublic void initLock(String key) {}@Overridepublic boolean tryLock() {return false;}@Overridepublic void tryRelease() {}@Overridepublic String getType() {// 【关键点1】,留着文章后续引用return "shardingLocalRegisterCenter";}@Overridepublic Properties getProperties() {return properties;}@Overridepublic void setProperties(Properties properties) {this.properties = properties;}}
下面这步很重要
在本地文件中添加注册中心
- 在resources文件夹下面新建Directory,名称为:META-INF
- 在META-INF继续创建名为service的Directory
- 添加file,名为org.apache.shardingsphere.orchestration.reg.api.RegistryCenter(注意这个是固定的),里面的内容是:com.example.demo.config.shardingconfig.LocalRegistryCenter(你本地注册中心类存放的全路径)
像这样
基于java的配置类
package com.example.demo.config.shardingconfig;import com.alibaba.druid.filter.Filter;import com.alibaba.druid.filter.logging.Slf4jLogFilter;import com.alibaba.druid.filter.stat.StatFilter;import com.alibaba.druid.pool.DruidDataSource;import com.alibaba.druid.util.StringUtils;import com.alibaba.druid.wall.WallConfig;import com.alibaba.druid.wall.WallFilter;import com.example.demo.config.datasource.DataSourceProperties;import com.example.demo.config.redis.RedisConfig;import com.example.demo.config.redis.RedisPrefixEnum;import org.apache.shardingsphere.api.config.sharding.ShardingRuleConfiguration;import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration;import org.apache.shardingsphere.api.config.sharding.strategy.StandardShardingStrategyConfiguration;import org.apache.shardingsphere.orchestration.config.OrchestrationConfiguration;import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;import org.apache.shardingsphere.shardingjdbc.orchestration.api.OrchestrationShardingDataSourceFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.autoconfigure.AutoConfigureAfter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.core.RedisTemplate;import javax.sql.DataSource;import java.sql.SQLException;import java.util.*;/*** @title: ShardingRuleConfig* @projectName shardingJavaDemo* @description: TODO* @author zhy* @date 2020/5/910:23*/@Configuration@AutoConfigureAfter({DataSourceProperties.class, RedisConfig.class})public class ShardingRuleConfig {private String defaultDataSource = DatasourceEnum.DEFAULT.getValue();@Autowiredprivate RedisTemplate<String,Object> redisTemplate;@Autowiredprivate DataSourceProperties properties;/*** shardingjdbc数据源* @param* @throws* @return javax.sql.DataSource* @author zhy* @date 2020/5/9 10:33*/@Beanpublic DataSource dataSource() throws SQLException {// 配置真实数据源Map<String, DataSource> dataSourceMap = new HashMap<>();//多数据源配置//数据源1DruidDataSource dataSource0 = druidDataSource();dataSourceMap.put(defaultDataSource, dataSource0);//数据源2// DruidDataSource dataSource1 = createDb1();// dataSourceMap.put("ds1", dataSource1);// 配置分片规则ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();//订单表分片规则TableRuleConfiguration orderRuleConfig = orderRuleConfig();shardingRuleConfig.getTableRuleConfigs().add(orderRuleConfig);//可以继续用add添加分片规则//shardingRuleConfig.getTableRuleConfigs().add(orderRuleConfig);//多数据源一定要指定默认数据源,只有一个数据源就不需要//shardingRuleConfig.setDefaultDataSourceName("ds0");Properties p = new Properties();//打印sql语句,生产环境关闭p.setProperty("sql.show",Boolean.TRUE.toString());OrchestrationConfiguration orchestrationConfig = new OrchestrationConfiguration("orchestration-sharding-data-source", new RegistryCenterConfiguration("shardingLocalRegisterCenter"),false);return OrchestrationShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig, p,orchestrationConfig);}/*** 订单分片规则* @param* @throws* @return io.shardingjdbc.core.api.config.TableRuleConfiguration* @author zhy* @date 2020/5/7 10:28*/private TableRuleConfiguration orderRuleConfig(){String logicTable = ShardingTableEnum.ORDER.getValue();String orderNodesByRedisCarPark = getActualDataNodesByCatalog(ShardingTableEnum.ORDER);//t_order_default 这张表是默认表,需要事先建好,防止首次启动报错String actualDataNodes = StringUtils.isEmpty(orderNodesByRedisCarPark) ? "ds0.t_order_default" : orderNodesByRedisCarPark;TableRuleConfiguration tableRuleConfig = new TableRuleConfiguration(logicTable,actualDataNodes);//设置分表策略tableRuleConfig.setTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("car_park_id",new CarParkShardingTableAlgorithm()));//根据时间将策略放进redis中,方便读取替换redisTemplate.opsForZSet().add(RedisPrefixEnum.SHARDING_RULE_ORDER.getValue(),actualDataNodes,new Date().getTime());return tableRuleConfig;}/*** 根据分表类型获取初始化actualDataNodes* @param logicTable* @throws* @return java.lang.String* @author zhy* @date 2020/5/11 14:52*/public String getActualDataNodesByCatalog(ShardingTableEnum logicTable){String redisKey = RedisPrefixEnum.CAR_PARK_ID_CATALOG.getValue();//获取所有的停车场Set<Object> keys = redisTemplate.opsForHash().keys(redisKey);if (keys.isEmpty()){return null;}StringBuilder sb = new StringBuilder();keys.forEach(obj -> {sb.append(defaultDataSource).append(".").append(logicTable.getValue()).append("_").append(obj.toString()).append(",");});sb.deleteCharAt(sb.length() - 1);return sb.toString();}/*** 获取druid数据库链接* @param* @throws* @return com.alibaba.druid.pool.DruidDataSource* @author zhy* @date 2020/5/7 10:29*/private DruidDataSource druidDataSource() {DruidDataSource dataSource = new DruidDataSource();dataSource.setDriverClassName(properties.getDriverClassName());dataSource.setUrl(properties.getUrl());dataSource.setUsername(properties.getUsername());dataSource.setPassword(properties.getPassword());dataSource.setInitialSize(properties.getInitialSize());dataSource.setMinIdle(properties.getMinIdle());dataSource.setMaxActive(properties.getMaxActive());dataSource.setMaxWait(properties.getMaxWait());dataSource.setTimeBetweenEvictionRunsMillis(properties.getTimeBetweenEvictionRunsMillis());dataSource.setMinEvictableIdleTimeMillis(properties.getMinEvictableIdleTimeMillis());String validationQuery = properties.getValidationQuery();if (validationQuery != null && !"".equals(validationQuery)) {dataSource.setValidationQuery(validationQuery);}dataSource.setTestWhileIdle(properties.isTestWhileIdle());dataSource.setTestOnBorrow(properties.isTestOnBorrow());dataSource.setTestOnReturn(properties.isTestOnReturn());if (properties.isPoolPreparedStatements()) {dataSource.setMaxPoolPreparedStatementPerConnectionSize(properties.getMaxPoolPreparedStatementPerConnectionSize());}String connectionPropertiesStr = properties.getConnectionProperties();if (connectionPropertiesStr != null && !"".equals(connectionPropertiesStr)) {Properties connectProperties = new Properties();String[] propertiesList = connectionPropertiesStr.split(";");for (String propertiesTmp : propertiesList) {String[] obj = propertiesTmp.split("=");String key = obj[0];String value = obj[1];connectProperties.put(key, value);}dataSource.setConnectProperties(connectProperties);}dataSource.setUseGlobalDataSourceStat(properties.isUseGlobalDataSourceStat());WallConfig wallConfig = new WallConfig();wallConfig.setMultiStatementAllow(true);WallFilter wallFilter = new WallFilter();wallFilter.setConfig(wallConfig);//打开日志记录过滤器,可通过log4j2,记录sql application.yml中配置【logging:config: classpath:logConfig/log4j2.xml】Slf4jLogFilter slf4jLogFilter = new Slf4jLogFilter();slf4jLogFilter.setStatementCreateAfterLogEnabled(false);slf4jLogFilter.setStatementCloseAfterLogEnabled(false);slf4jLogFilter.setResultSetOpenAfterLogEnabled(false);slf4jLogFilter.setResultSetCloseAfterLogEnabled(false);List<Filter> filters = new ArrayList<>();filters.add(wallFilter);filters.add(new StatFilter());filters.add(slf4jLogFilter);dataSource.setProxyFilters(filters);return dataSource;}}
动态替换的shardingService
package com.example.demo.config.shardingconfig;import com.example.demo.config.datasource.DataSourceProperties;import com.example.demo.config.redis.RedisConfig;import com.example.demo.config.redis.RedisPrefixEnum;import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEvent;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.autoconfigure.AutoConfigureAfter;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import java.util.ArrayList;import java.util.Set;/*** @title: ShardingService* @projectName shardingJavaDemo* @description: TODO* @author zhy* @date 2020/5/1115:08*/@Component@AutoConfigureAfter({RedisConfig.class})public class ShardingService {@Autowiredprivate RedisTemplate<String,Object> redisTemplate;@Autowiredprivate ShardingRuleConfig shardingRuleConfig;/*** 替换sharding里的分表规则ActualDataNodes的值* @param oldRule* @param newRule* @throws* @return void* @author zhy* @date 2020/5/11 15:12*/public void replaceActualDataNodes(String oldRule,String newRule){// 获取已有的配置String rules = LocalRegistryCenter.values.get("/orchestration-sharding-data-source/config/schema/logic_db/rule");// 修改规则String rule = rules.replace(oldRule, newRule);LocalRegistryCenter.listeners.get("/orchestration-sharding-data-source/config/schema").onChange(new DataChangedEvent("/orchestration-sharding-data-source/config/schema/logic_db/rule",rule, DataChangedEvent.ChangedType.UPDATED));LocalRegistryCenter.values.put("/orchestration-sharding-data-source/config/schema/logic_db/rule",rule);}/*** 获取当前的分表规则* @param shardingTableEnum* @throws* @return java.lang.String* @author zhy* @date 2020/5/11 15:56*/public String getActualDataNodesInRedis(ShardingTableEnum shardingTableEnum){String redisKey = RedisPrefixEnum.SHARDING_RULE_ORDER.getValue();//倒序获取一条最新的纪录Set<Object> objects = redisTemplate.opsForZSet().reverseRange(redisKey, 0, 1);return new ArrayList<>(objects).get(0).toString();}/*** 根据redis中存储的停车场id获取分表规则* @param shardingTableEnum* @throws* @return java.lang.String* @author zhy* @date 2020/5/11 16:09*/public String getActualDataNodesByCatalog(ShardingTableEnum shardingTableEnum){return shardingRuleConfig.getActualDataNodesByCatalog(shardingTableEnum);}}
最后贴上源码
https://github.com/zhyhuayong/shardingJavaDemo
大家可以参考源码,要测试的同学记得修改自己的数据库配置哦
