写在前面
上一篇文章出现的问题,就是sharing-jdbc无法根据一个不固定的字段(停车场id)进行动态分表,因为actual-data-nodes是在项目启动的时候就加载好的,不支持动态修改。
还好这一切都是可以解决的。
那么这一篇文章,就是解决了actual-data-nodes动态修改问题。解决方案大致说明一下就是基于sharding-jdbc + sharding的服务编排治理+redis,实现了订单表根据停车场id动态分表,每增删停车场,在不重启项目的情况下动态的改变actual-data-nodes
思路
首先参考了这篇老哥的博文:https://blog.csdn.net/qq_32588349/article/details/99440985 给了我很大的启发
根据官方文档描述,shardingsphere提供了配置中心、注册中心的服务治理功能。并且有这句描述:
这应该就是我想要的东西,但是我确实不知道该怎么入手,就让公司的架构师给我处理了一下。然后最终得到了下面这个方案。在这里要感谢两位老哥!
大概的描述一下本demo的业务:
- sharing-jdbc : 引入基于java的配置包(不用starter包)
- 需要分的表为:订单表(t_order)
- 分表的依据字段:停车场id(car_park_id)
- 分库字段:不需要分库
- 初始化数据库要有一个默认表:t_order_defalut,这个表只是为了第一次启动项目,还没有停车场信息的时候,用来默认的,里面不会存任何数据
- redis hash存放<停车场id,停车场名称>
- redis zset存放 订单actual-data-nodes,score为当前时间的时间戳,方便获取最新的用来替换
配置步骤
sql脚本
/*
Navicat MySQL Data Transfer
Source Server : 开发数据库 4.71
Source Server Version : 50730
Source Host : 192.168.4.71:3307
Source Database : sharding_carpark
Target Server Type : MYSQL
Target Server Version : 50730
File Encoding : 65001
Date: 2020-05-11 18:25:00
*/
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for t_car_park
-- ----------------------------
DROP TABLE IF EXISTS `t_car_park`;
CREATE TABLE `t_car_park` (
`id` varchar(64) NOT NULL,
`name` varchar(100) DEFAULT NULL COMMENT '名称',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='停车场表';
-- ----------------------------
-- Table structure for t_order_default
-- ----------------------------
DROP TABLE IF EXISTS `t_order_default`;
CREATE TABLE `t_order_default` (
`id` varchar(64) NOT NULL,
`name` varchar(100) DEFAULT NULL COMMENT '名称',
`car_park_id` varchar(64) DEFAULT NULL COMMENT '停车场id',
`no` varchar(100) DEFAULT NULL COMMENT '订单号',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='测试分表';
pom文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--mybatisplus-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1.tmp</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<!--druid-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.20</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-core</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-orchestration</artifactId>
<version>4.0.0</version>
</dependency>
重点关注:sharding-jdbc-core、sharding-jdbc-orchestration这两个包,这两个包是必备的。
application.yml
server:
port: 8086
tomcat:
max-threads: 100
spring:
druid:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://192.168.4.71:3307/sharding_carPark?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.jdbc.Driver
username: root
password: 123456
maxActive: 20
initialSize: 5
maxWait: 60000
minIdle: 5
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
#是否缓存preparedStatement,也就是PSCache。在mysql下建议关闭。 PSCache对支持游标的数据库性能提升巨大,比如说oracle。
poolPreparedStatements: false
#要启用PSCache,-1为关闭 必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true 可以把这个数值配置大一些,比如说100
maxOpenPreparedStatements: -1
#配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
filters: stat,wall,log4j2
#通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
#合并多个DruidDataSource的监控数据
useGlobalDataSourceStat: true
loginUsername: druid
loginPassword: druid
redis:
database: 1
host: 192.168.4.71
port: 6379
password: 123456
jedis:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
logging:
level:
com.example.demo: debug
这里的配置文件比较常规
LocalRegistryCenter 本地注册中心
package com.example.demo.config.shardingconfig;
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenter;
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;
import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEventListener;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
public class LocalRegistryCenter implements RegistryCenter {
public static Map<String, DataChangedEventListener> listeners = new ConcurrentHashMap<>();
private RegistryCenterConfiguration config;
private Properties properties;
/**
* public 是为了在重置节点的时候减少去重新读配置
*/
public static Map<String, String> values = new ConcurrentHashMap<>();
@Override
public void init(RegistryCenterConfiguration config) {
this.config = config;
}
@Override
public String get(String key) {
return values.get(key);
}
@Override
public String getDirectly(String key) {
return values.get(key);
}
@Override
public boolean isExisted(String key) {
return values.containsKey(key);
}
@Override
public List<String> getChildrenKeys(String key) {
return null;
}
@Override
public void persist(String key, String value) {
values.put(key, value);
}
@Override
public void update(String key, String value) {
values.put(key, value);
}
@Override
public void persistEphemeral(String key, String value) {
values.put(key, value);
}
@Override
public void watch(String key, DataChangedEventListener dataChangedEventListener) {
if (null != dataChangedEventListener) {
// 将数据改变的事件监听器缓存下来
listeners.put(key, dataChangedEventListener);
}
}
@Override
public void close() {
config = null;
}
@Override
public void initLock(String key) {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public void tryRelease() {
}
@Override
public String getType() {
// 【关键点1】,留着文章后续引用
return "shardingLocalRegisterCenter";
}
@Override
public Properties getProperties() {
return properties;
}
@Override
public void setProperties(Properties properties) {
this.properties = properties;
}
}
下面这步很重要
在本地文件中添加注册中心
- 在resources文件夹下面新建Directory,名称为:META-INF
- 在META-INF继续创建名为service的Directory
- 添加file,名为org.apache.shardingsphere.orchestration.reg.api.RegistryCenter(注意这个是固定的),里面的内容是:com.example.demo.config.shardingconfig.LocalRegistryCenter(你本地注册中心类存放的全路径)
像这样
基于java的配置类
package com.example.demo.config.shardingconfig;
import com.alibaba.druid.filter.Filter;
import com.alibaba.druid.filter.logging.Slf4jLogFilter;
import com.alibaba.druid.filter.stat.StatFilter;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.util.StringUtils;
import com.alibaba.druid.wall.WallConfig;
import com.alibaba.druid.wall.WallFilter;
import com.example.demo.config.datasource.DataSourceProperties;
import com.example.demo.config.redis.RedisConfig;
import com.example.demo.config.redis.RedisPrefixEnum;
import org.apache.shardingsphere.api.config.sharding.ShardingRuleConfiguration;
import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration;
import org.apache.shardingsphere.api.config.sharding.strategy.StandardShardingStrategyConfiguration;
import org.apache.shardingsphere.orchestration.config.OrchestrationConfiguration;
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;
import org.apache.shardingsphere.shardingjdbc.orchestration.api.OrchestrationShardingDataSourceFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.*;
/**
* @title: ShardingRuleConfig
* @projectName shardingJavaDemo
* @description: TODO
* @author zhy
* @date 2020/5/910:23
*/
@Configuration
@AutoConfigureAfter({DataSourceProperties.class, RedisConfig.class})
public class ShardingRuleConfig {
private String defaultDataSource = DatasourceEnum.DEFAULT.getValue();
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@Autowired
private DataSourceProperties properties;
/**
* shardingjdbc数据源
* @param
* @throws
* @return javax.sql.DataSource
* @author zhy
* @date 2020/5/9 10:33
*/
@Bean
public DataSource dataSource() throws SQLException {
// 配置真实数据源
Map<String, DataSource> dataSourceMap = new HashMap<>();
//多数据源配置
//数据源1
DruidDataSource dataSource0 = druidDataSource();
dataSourceMap.put(defaultDataSource, dataSource0);
//数据源2
// DruidDataSource dataSource1 = createDb1();
// dataSourceMap.put("ds1", dataSource1);
// 配置分片规则
ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
//订单表分片规则
TableRuleConfiguration orderRuleConfig = orderRuleConfig();
shardingRuleConfig.getTableRuleConfigs().add(orderRuleConfig);
//可以继续用add添加分片规则
//shardingRuleConfig.getTableRuleConfigs().add(orderRuleConfig);
//多数据源一定要指定默认数据源,只有一个数据源就不需要
//shardingRuleConfig.setDefaultDataSourceName("ds0");
Properties p = new Properties();
//打印sql语句,生产环境关闭
p.setProperty("sql.show",Boolean.TRUE.toString());
OrchestrationConfiguration orchestrationConfig = new OrchestrationConfiguration(
"orchestration-sharding-data-source", new RegistryCenterConfiguration("shardingLocalRegisterCenter"),
false);
return OrchestrationShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig, p,
orchestrationConfig);
}
/**
* 订单分片规则
* @param
* @throws
* @return io.shardingjdbc.core.api.config.TableRuleConfiguration
* @author zhy
* @date 2020/5/7 10:28
*/
private TableRuleConfiguration orderRuleConfig(){
String logicTable = ShardingTableEnum.ORDER.getValue();
String orderNodesByRedisCarPark = getActualDataNodesByCatalog(ShardingTableEnum.ORDER);
//t_order_default 这张表是默认表,需要事先建好,防止首次启动报错
String actualDataNodes = StringUtils.isEmpty(orderNodesByRedisCarPark) ? "ds0.t_order_default" : orderNodesByRedisCarPark;
TableRuleConfiguration tableRuleConfig = new TableRuleConfiguration(logicTable,actualDataNodes);
//设置分表策略
tableRuleConfig.setTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("car_park_id",new CarParkShardingTableAlgorithm()));
//根据时间将策略放进redis中,方便读取替换
redisTemplate.opsForZSet().add(RedisPrefixEnum.SHARDING_RULE_ORDER.getValue(),actualDataNodes,new Date().getTime());
return tableRuleConfig;
}
/**
* 根据分表类型获取初始化actualDataNodes
* @param logicTable
* @throws
* @return java.lang.String
* @author zhy
* @date 2020/5/11 14:52
*/
public String getActualDataNodesByCatalog(ShardingTableEnum logicTable){
String redisKey = RedisPrefixEnum.CAR_PARK_ID_CATALOG.getValue();
//获取所有的停车场
Set<Object> keys = redisTemplate.opsForHash().keys(redisKey);
if (keys.isEmpty()){
return null;
}
StringBuilder sb = new StringBuilder();
keys.forEach(obj -> {
sb.append(defaultDataSource).append(".").append(logicTable.getValue()).append("_").append(obj.toString()).append(",");
});
sb.deleteCharAt(sb.length() - 1);
return sb.toString();
}
/**
* 获取druid数据库链接
* @param
* @throws
* @return com.alibaba.druid.pool.DruidDataSource
* @author zhy
* @date 2020/5/7 10:29
*/
private DruidDataSource druidDataSource() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setDriverClassName(properties.getDriverClassName());
dataSource.setUrl(properties.getUrl());
dataSource.setUsername(properties.getUsername());
dataSource.setPassword(properties.getPassword());
dataSource.setInitialSize(properties.getInitialSize());
dataSource.setMinIdle(properties.getMinIdle());
dataSource.setMaxActive(properties.getMaxActive());
dataSource.setMaxWait(properties.getMaxWait());
dataSource.setTimeBetweenEvictionRunsMillis(properties.getTimeBetweenEvictionRunsMillis());
dataSource.setMinEvictableIdleTimeMillis(properties.getMinEvictableIdleTimeMillis());
String validationQuery = properties.getValidationQuery();
if (validationQuery != null && !"".equals(validationQuery)) {
dataSource.setValidationQuery(validationQuery);
}
dataSource.setTestWhileIdle(properties.isTestWhileIdle());
dataSource.setTestOnBorrow(properties.isTestOnBorrow());
dataSource.setTestOnReturn(properties.isTestOnReturn());
if (properties.isPoolPreparedStatements()) {
dataSource.setMaxPoolPreparedStatementPerConnectionSize(properties.getMaxPoolPreparedStatementPerConnectionSize());
}
String connectionPropertiesStr = properties.getConnectionProperties();
if (connectionPropertiesStr != null && !"".equals(connectionPropertiesStr)) {
Properties connectProperties = new Properties();
String[] propertiesList = connectionPropertiesStr.split(";");
for (String propertiesTmp : propertiesList) {
String[] obj = propertiesTmp.split("=");
String key = obj[0];
String value = obj[1];
connectProperties.put(key, value);
}
dataSource.setConnectProperties(connectProperties);
}
dataSource.setUseGlobalDataSourceStat(properties.isUseGlobalDataSourceStat());
WallConfig wallConfig = new WallConfig();
wallConfig.setMultiStatementAllow(true);
WallFilter wallFilter = new WallFilter();
wallFilter.setConfig(wallConfig);
//打开日志记录过滤器,可通过log4j2,记录sql application.yml中配置【logging:config: classpath:logConfig/log4j2.xml】
Slf4jLogFilter slf4jLogFilter = new Slf4jLogFilter();
slf4jLogFilter.setStatementCreateAfterLogEnabled(false);
slf4jLogFilter.setStatementCloseAfterLogEnabled(false);
slf4jLogFilter.setResultSetOpenAfterLogEnabled(false);
slf4jLogFilter.setResultSetCloseAfterLogEnabled(false);
List<Filter> filters = new ArrayList<>();
filters.add(wallFilter);
filters.add(new StatFilter());
filters.add(slf4jLogFilter);
dataSource.setProxyFilters(filters);
return dataSource;
}
}
动态替换的shardingService
package com.example.demo.config.shardingconfig;
import com.example.demo.config.datasource.DataSourceProperties;
import com.example.demo.config.redis.RedisConfig;
import com.example.demo.config.redis.RedisPrefixEnum;
import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Set;
/**
* @title: ShardingService
* @projectName shardingJavaDemo
* @description: TODO
* @author zhy
* @date 2020/5/1115:08
*/
@Component
@AutoConfigureAfter({RedisConfig.class})
public class ShardingService {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@Autowired
private ShardingRuleConfig shardingRuleConfig;
/**
* 替换sharding里的分表规则ActualDataNodes的值
* @param oldRule
* @param newRule
* @throws
* @return void
* @author zhy
* @date 2020/5/11 15:12
*/
public void replaceActualDataNodes(String oldRule,String newRule){
// 获取已有的配置
String rules = LocalRegistryCenter.values
.get("/orchestration-sharding-data-source/config/schema/logic_db/rule");
// 修改规则
String rule = rules.replace(oldRule, newRule);
LocalRegistryCenter.listeners.get("/orchestration-sharding-data-source/config/schema")
.onChange(new DataChangedEvent(
"/orchestration-sharding-data-source/config/schema/logic_db/rule",
rule, DataChangedEvent.ChangedType.UPDATED));
LocalRegistryCenter.values.put("/orchestration-sharding-data-source/config/schema/logic_db/rule",rule);
}
/**
* 获取当前的分表规则
* @param shardingTableEnum
* @throws
* @return java.lang.String
* @author zhy
* @date 2020/5/11 15:56
*/
public String getActualDataNodesInRedis(ShardingTableEnum shardingTableEnum){
String redisKey = RedisPrefixEnum.SHARDING_RULE_ORDER.getValue();
//倒序获取一条最新的纪录
Set<Object> objects = redisTemplate.opsForZSet().reverseRange(redisKey, 0, 1);
return new ArrayList<>(objects).get(0).toString();
}
/**
* 根据redis中存储的停车场id获取分表规则
* @param shardingTableEnum
* @throws
* @return java.lang.String
* @author zhy
* @date 2020/5/11 16:09
*/
public String getActualDataNodesByCatalog(ShardingTableEnum shardingTableEnum){
return shardingRuleConfig.getActualDataNodesByCatalog(shardingTableEnum);
}
}
最后贴上源码
https://github.com/zhyhuayong/shardingJavaDemo
大家可以参考源码,要测试的同学记得修改自己的数据库配置哦