在面对较大的数据量时或者面对高并发时经常会对数据库进行分库分表,将数据一分为二或者或者是进行一主多从,今天我们要学习到的就是如何动态的切换数据源以及其中的一些配置,还有分库后面对的JTA事务问题。
多数据源配置
yml文件配置
spring:
profiles:
active: sit
datasource:
type: com.alibaba.druid.pool.DruidDataSource
#url: jdbc:mysql://ip:3306/pandora_sit?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8
#username: xxxx
#password: xxxx
druid:
db-type: MYSQL
driver-class-name: com.mysql.cj.jdbc.Driver
initial-size: 20
max-active: 20
min-idle: 2
max-wait: 60000 #空闲等待超时
time-between-eviction-runs-millis: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
min-evictable-idle-time-millis: 300000 # 配置一个连接在池中最小生存的时间,单位是毫秒
sit:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver #可能存在其它类型数据库的可能
url: jdbc:mysql://ip:3306/pandora_sit?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8
username: xxxx
password: xxxx
type: com.alibaba.druid.pool.DruidDataSource
prod:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://ip:3306/pandora_prod?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8
username: xxxx
password: xxxx
type: com.alibaba.druid.pool.DruidDataSource
多数据源配置类
从上面的配置文件把sit和prod两个库的数据源的配置文件加载到DataSourceProperties,然后在根据DataSourceProperties去实例化一个DataSource,因为我们这里有两个数据源所以生成对应的两个DataSourceProperties和DataSource,当然还需要配置对应配置文件扫描的dao层和对应的xml。因为我这里sit和prod都是操作同样的dao和xml所以我这里就合二为一了。有了扫描路径就需要根据动态的数据源配置对应的SqlSessionFactory和事务。
package com.huke.world.config;
import com.baomidou.mybatisplus.core.MybatisConfiguration;
import com.baomidou.mybatisplus.core.MybatisXMLLanguageDriver;
import com.baomidou.mybatisplus.core.config.GlobalConfig;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.huke.world.context.DynamicDataSource;
import com.huke.world.enums.DataSourceEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.type.JdbcType;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
import java.io.IOException;
import java.util.*;
/**
* @author heian
* @date 2021/3/7 12:42 上午
* @description sit多数据源配置
* 参考配置:https://www.cnblogs.com/lucky9322/p/13326622.html
*/
@Slf4j
@Configuration
@MapperScan(basePackages = {"com.huke.app.dao","com.huke.admin.dao","com.huke.world.service.dao"},sqlSessionFactoryRef = "sqlSessionFactory")
public class DataSourceConfig {
@Bean("sitDataSourceProperties")
@Primary
@ConfigurationProperties("sit.datasource")
public DataSourceProperties sitDataSourceProperties() {
return new DataSourceProperties();
}
@Bean("sitDataSource")
@Primary//单接口都实现默认加载此方法,多个同类型bean优先被考虑,这个注解必须要加,因为不加的话spring将分不清楚那个为主数据源(默认数据源)
public DataSource buildSitDataSource(@Qualifier("sitDataSourceProperties") DataSourceProperties properties) {
log.info("加载-------sit数据源配置");
return properties.initializeDataSourceBuilder().build();
}
@Bean("prodDataSourceProperties")
@ConfigurationProperties("prod.datasource")
public DataSourceProperties prodDataSourceProperties() {
return new DataSourceProperties();
}
@Bean("prodDataSource")
public DataSource buildProdDataSource(@Qualifier("prodDataSourceProperties") DataSourceProperties properties) {
log.info("加载-------prod数据源配置");
return properties.initializeDataSourceBuilder().build();
}
/**
* @Qualifier 根据名称进行注入,通常是在具有相同的多个类型的实例的一个注入(例如有多个DataSource类型的实例)
*/
@Bean("dynamicDataSource")
public DynamicDataSource dynamicDataSource(@Qualifier("sitDataSource") DataSource sitDataSource,
@Qualifier("prodDataSource") DataSource prodDataSource) {
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DataSourceEnum.Sit, sitDataSource);
targetDataSources.put(DataSourceEnum.Prod, prodDataSource);
DynamicDataSource dataSource = new DynamicDataSource();
dataSource.setTargetDataSources(targetDataSources);//设置目标数据源集合
return dataSource;
}
@Bean("sqlSessionFactory")
public SqlSessionFactory sqlSessionFactory(@Qualifier("dynamicDataSource") DynamicDataSource dynamicDataSource) throws Exception {
MybatisSqlSessionFactoryBean sqlSessionFactory = new MybatisSqlSessionFactoryBean();
sqlSessionFactory.setDataSource(dynamicDataSource);//指定目标数据源
MybatisConfiguration configuration = new MybatisConfiguration();
configuration.setDefaultScriptingLanguage(MybatisXMLLanguageDriver.class);
configuration.setJdbcTypeForNull(JdbcType.NULL);
sqlSessionFactory.setConfiguration(configuration);
sqlSessionFactory.setMapperLocations(resolveMapperLocations());
sqlSessionFactory.setPlugins(new Interceptor[]{
new PaginationInterceptor(),//分页插件
new PerformanceInterceptor()
// .setFormat(true),
});
sqlSessionFactory.setGlobalConfig(new GlobalConfig().setBanner(false));
return sqlSessionFactory.getObject();
}
/**
* 加载多个mapper路径
*/
public Resource[] resolveMapperLocations() {
ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
List<String> mapperLocations = new ArrayList<>();
mapperLocations.add("classpath:mapper/*.xml");
mapperLocations.add("classpath:com/huke/app/mapper/*.xml");
mapperLocations.add("classpath:com/huke/admin/mapper/*.xml");
List<Resource> resources = new ArrayList<>();
for (String mapperLocation : mapperLocations) {
try {
Resource[] mappers = resourceResolver.getResources(mapperLocation);
resources.addAll(Arrays.asList(mappers));
} catch (IOException e) {
}
}
return resources.toArray(new Resource[resources.size()]);
}
//事务
@Bean
public DataSourceTransactionManager transactionManager(DynamicDataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
上下文数据源切换
将注入的sit环境和prod环境的的DataSource,通过他们哥俩生成DynamicDataSource,因为它继承了AbstractRoutingDataSource,而里面的determineCurrentLookupKey方法就是动态切换数据源的保证。每次执行sql都会执行DynamicDataSource类的determineCurrentLookupKey()方法,至于是哪里做了拦截,我目前还不清楚,需要等有时间看源码发现其中奥秘。
public enum DataSourceEnum {
Sit,
Prod,
;
}
public class DataSourceContextHolder {
private static final ThreadLocal<DataSourceEnum> tl = ThreadLocal.withInitial(() -> DataSourceEnum.Sit);
public static void setDataSourceType(DataSourceEnum type) {
tl.set(type);
}
public static DataSourceEnum getDataSourceType() {
return tl.get();
}
}
//继承 AbstractRoutingDataSource 类,实现对应数据源key的切换
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceType();
}
}
Aop切面重置数据源
目的是为了让我们接口方法一进来,去查询当前的数据源是否是sit不是则进行切换,因为可能存在后面有的业务逻辑执行了切换prod数据源。而该线程并没有消亡所以其内部存储的依然是prod数据源,需要重置
package com.huke.world.aspect;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.huke.app.entity.AppMember;
import com.huke.app.service.AppMemberService;
import com.huke.world.common.annotations.WithoutLogin;
import com.huke.world.common.beans.UserBean;
import com.huke.world.common.constants.RedisConstant;
import com.huke.world.common.context.AppContext;
import com.huke.world.common.context.RequestContext;
import com.huke.world.common.exception.CustomException;
import com.huke.world.common.result.HttpCodeEnum;
import com.huke.world.common.utils.RedisUtil;
import com.huke.world.context.DataSourceContextHolder;
import com.huke.world.enums.DataSourceEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
import java.lang.reflect.Method;
import java.time.LocalDateTime;
@Order(1)
@Aspect
@Component
@Slf4j
public class AuthAspect extends HandlerInterceptorAdapter {
@Autowired
private RedisUtil redisUtil;
@Autowired
private AppMemberService memberService;
@Pointcut("execution(* com.huke.world.controller..*.*(..))")
public void aspect() {
}
/**
* 对所有接口进行鉴权,并将接口信息绑定上下文
*/
@Around("aspect()")
public Object process(ProceedingJoinPoint point) throws Throwable {
//因为可能存在当业务完成,数据源已经被切回到prod,所以需要我们在执行controller方法之前将数据库环境设置为sit
DataSourceEnum dataSourceType = DataSourceContextHolder.getDataSourceType();
if (dataSourceType.equals(DataSourceEnum.Prod)) {
DataSourceContextHolder.setDataSourceType(DataSourceEnum.Sit);
}
RequestContext rc = AppContext.getRequestContext();
//并将用户信息绑定到当前上下文,并重设token有效期(30分钟)
String token = rc.getToken();
if (StringUtils.isNotBlank(token)) {
String jsonStr = (String) redisUtil.get(RedisConstant.TOKEN + token);
if (StringUtils.isNotBlank(jsonStr)) {
jsonStr = JSON.parse(jsonStr).toString();
UserBean user = JSONObject.parseObject(jsonStr,UserBean.class);
rc.setUid(user.getId());
rc.setCurrentUser(user);
redisUtil.expire(RedisConstant.TOKEN + token, RedisConstant.USER_TOKEN_EXPIRE);
//每次接口需要去更新一下用户表的update_time
memberService.lambdaUpdate().set(AppMember::getUpdateTime, LocalDateTime.now()).eq(AppMember::getMemberId,user.getId()).update();
}
}
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
WithoutLogin withoutLogin = method.getDeclaredAnnotation(WithoutLogin.class);//登录权限认证
Long uid = rc.getUid();
if (null == uid && null == withoutLogin) {
throw new CustomException(HttpCodeEnum.USERNAME_OR_PASSWORD_ERR);
}
return point.proceed();
}
}
测试
现在我们执行sit数据库的查询和切换到prod数据库的查询,目的达到了。
public Tip dynamicDatasourceTest(){
log.info("当前线程:" + Thread.currentThread().getName());
log.info("当前数据库环境" + DataSourceContextHolder.getDataSourceType().name());
//prod 存在1的用户,不存在223 sit环境存在 223,不存在1
AppMember prodMember = memberService.getById(1);
System.out.println(prodMember);//null
AppMember sitMember = memberService.getById(223);
System.out.println(sitMember);//有值
DataSourceContextHolder.setDataSourceType(DataSourceEnum.Prod);
log.info("当前数据库环境" + DataSourceContextHolder.getDataSourceType().name());
AppMember prodMember2 = memberService.getById(1);
System.out.println(prodMember2);//有值
AppMember sitMember2 = memberService.getById(223);
System.out.println(sitMember2);//null
return TipUtil.success();
}
发现的问题
@Transactional
public Tip dynamicDatasourceTest(){
log.info("当前线程:" + Thread.currentThread().getName());
log.info("当前数据库环境" + DataSourceContextHolder.getDataSourceType().name());
//原本是其它昵称
boolean update = memberService.lambdaUpdate().set(AppMember::getNickname, "测试-胡玲").eq(AppMember::getMemberId, 223).update();
System.out.println(update);
DataSourceContextHolder.setDataSourceType(DataSourceEnum.Prod);
boolean update2 = memberService.lambdaUpdate().set(AppMember::getNickname, "生产-胡玲").eq(AppMember::getMemberId, 1).update();
System.out.println(update2);
int a = 1/0;
return TipUtil.success();
}
在上面代码中,我加上本地事务注解会导致切换prod数据源会失败,原因是spring对@Transactional注解上的类做了个拦截org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransaction
会导致切换数据源失败,如果你非要加本地事务建议缩小拦截的范围(但是依然没生效,未解决)。
提示
所以对于这种情况,我没做深入研究,但是现在有很多中间件可以解决而且也推荐这么做。比如mycat和sharding-jdbc。如果项目不大建议后者,如果项目多设计数据库信息多,建议mycat服务端反向代理这种。