Java Spring

动态数据源是什么?解决了什么问题?

在实际的开发中,同一个项目中使用多个数据源是很常见的场景。比如,一个读写分离的项目存在主数据源与读数据源。
所谓动态数据源,就是通过Spring的一些配置来自动控制某段数据操作逻辑是走哪一个数据源。举个读写分离的例子,项目中引用了两个数据源,master、slave。通过Spring配置或扩展能力来使得一个接口中调用了查询方法会自动使用slave数据源。
一般实现这种效果可以通过:

  1. 使用@MapperScan注解指定某个包下的所有方法走固定的数据源(这个比较死板些,会产生冗余代码,到也可以达到效果,可以作为临时方案使用);
  2. 使用注解+AOP+AbstractRoutingDataSource的形式来指定某个方法下的数据库操作是走那个数据源。
  3. 通过 Sharding-JDBC 组件来实现(需要引入外部依赖,如果项目本身引用了该组件,建议用这种方式实现)

    关键核心类

    这里主要介绍通过注解+AOP+AbstractRoutingDataSource的联动来实现动态数据源的方式。
    一切的起点是AbstractRoutingDataSource这个类,此类实现了 DataSource 接口 ```java public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {

    // …. 省略 …

    @Nullable private Map targetDataSources;

    @Nullable private Map resolvedDataSources;

  1. public void setTargetDataSources(Map<Object, Object> targetDataSources) {
  2. this.targetDataSources = targetDataSources;
  3. }
  4. public void setDefaultTargetDataSource(Object defaultTargetDataSource) {
  5. this.defaultTargetDataSource = defaultTargetDataSource;
  6. }
  7. @Override
  8. public void afterPropertiesSet() {
  9. // 初始化 targetDataSources、resolvedDataSources
  10. if (this.targetDataSources == null) {
  11. throw new IllegalArgumentException("Property 'targetDataSources' is required");
  12. }
  13. this.resolvedDataSources = new HashMap<>(this.targetDataSources.size());
  14. this.targetDataSources.forEach((key, value) -> {
  15. Object lookupKey = resolveSpecifiedLookupKey(key);
  16. DataSource dataSource = resolveSpecifiedDataSource(value);
  17. this.resolvedDataSources.put(lookupKey, dataSource);
  18. });
  19. if (this.defaultTargetDataSource != null) {
  20. this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
  21. }
  22. }
  23. @Override
  24. public Connection getConnection() throws SQLException {
  25. return determineTargetDataSource().getConnection();
  26. }
  27. @Override
  28. public Connection getConnection(String username, String password) throws SQLException {
  29. return determineTargetDataSource().getConnection(username, password);
  30. }
  31. /**
  • Retrieve the current target DataSource. Determines the
  • {@link #determineCurrentLookupKey() current lookup key}, performs
  • a lookup in the {@link #setTargetDataSources targetDataSources} map,
  • falls back to the specified
  • {@link #setDefaultTargetDataSource default target DataSource} if necessary.
  • @see #determineCurrentLookupKey() */ protected DataSource determineTargetDataSource() {

    1. Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
    2. // @1 start
    3. Object lookupKey = determineCurrentLookupKey();
    4. DataSource dataSource = this.resolvedDataSources.get(lookupKey);
    5. // @1 end
    6. if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
    7. dataSource = this.resolvedDefaultDataSource;
    8. }
    9. if (dataSource == null) {
    10. throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
    11. }
    12. return dataSource;

    }

    /**

  • 返回一个key,这个key用来从 resolvedDataSources 数据源中获取具体的数据源对象 见 @1 */ @Nullable protected abstract Object determineCurrentLookupKey();

}

  1. 可以看到 `AbstractRoutingDataSource` 中有个可扩展抽象方法 `determineCurrentLookupKey()`,利用这个方法可以来实现动态数据源效果。
  2. <a name="RezdX"></a>
  3. ## 从0写一个简单动态数据源组件
  4. 通过实现`AbstractRoutingDataSource` `determineCurrentLookupKey()` 方法动态设置一个key,然后在配置类下通过`setTargetDataSources()`方法设置提前准备好的DataSource Map
  5. <a name="Xavf5"></a>
  6. ### 注解、常量定义、ThreadLocal 准备
  7. ```java
  8. /**
  9. * @Summary 动态数据源注解定义
  10. */
  11. @Target(ElementType.METHOD)
  12. @Retention(RetentionPolicy.RUNTIME)
  13. public @interface MyDS {
  14. String value() default "default";
  15. }
  16. /**
  17. * @author axin
  18. * @Summary 动态数据源常量
  19. */
  20. public interface DSConst {
  21. String 默认 = "default";
  22. String 主库 = "master";
  23. String 从库 = "slave";
  24. String 统计 = "stat";
  25. }
  1. /**
  2. * @Summary 动态数据源 ThreadLocal 工具
  3. */
  4. public class DynamicDataSourceHolder {
  5. //保存当前线程所指定的DataSource
  6. private static final ThreadLocal<String> THREAD_DATA_SOURCE = new ThreadLocal<>();
  7. public static String getDataSource() {
  8. return THREAD_DATA_SOURCE.get();
  9. }
  10. public static void setDataSource(String dataSource) {
  11. THREAD_DATA_SOURCE.set(dataSource);
  12. }
  13. public static void removeDataSource() {
  14. THREAD_DATA_SOURCE.remove();
  15. }
  16. }

自定义一个 AbstractRoutingDataSource

  1. /**
  2. * @Summary 动态数据源
  3. */
  4. public class DynamicDataSource extends AbstractRoutingDataSource {
  5. /**
  6. * 从数据源中获取目标数据源的key
  7. * @return
  8. */
  9. @Override
  10. protected Object determineCurrentLookupKey() {
  11. // 从ThreadLocal中获取key
  12. String dataSourceKey = DynamicDataSourceHolder.getDataSource();
  13. if (StringUtils.isEmpty(dataSourceKey)) {
  14. return DSConst.默认;
  15. }
  16. return dataSourceKey;
  17. }
  18. }

AOP 实现

  1. /**
  2. * @Summary 数据源切换AOP
  3. */
  4. @Slf4j
  5. @Aspect
  6. @Service
  7. public class DynamicDataSourceAOP {
  8. public DynamicDataSourceAOP() {
  9. log.info("/*---------------------------------------*/");
  10. log.info("/*---------- ----------*/");
  11. log.info("/*---------- 动态数据源初始化... ----------*/");
  12. log.info("/*---------- ----------*/");
  13. log.info("/*---------------------------------------*/");
  14. }
  15. /**
  16. * 切点
  17. */
  18. @Pointcut(value = "@annotation(xxx.xxx.MyDS)")
  19. private void method(){}
  20. /**
  21. * 方法执行前,切换到指定的数据源
  22. * @param point
  23. */
  24. @Before("method()")
  25. public void before(JoinPoint point) {
  26. MethodSignature methodSignature = (MethodSignature) point.getSignature();
  27. //获取被代理的方法对象
  28. Method targetMethod = methodSignature.getMethod();
  29. //获取被代理方法的注解信息
  30. CultureDS cultureDS = AnnotationUtils.findAnnotation(targetMethod, CultureDS.class);
  31. // 方法链条最外层的动态数据源注解优先级最高
  32. String key = DynamicDataSourceHolder.getDataSource();
  33. if (!StringUtils.isEmpty(key)) {
  34. log.warn("提醒:动态数据源注解调用链上出现覆盖场景,请确认是否无问题");
  35. return;
  36. }
  37. if (cultureDS != null ) {
  38. //设置数据库标志
  39. DynamicDataSourceHolder.setDataSource(MyDS.value());
  40. }
  41. }
  42. /**
  43. * 释放数据源
  44. */
  45. @AfterReturning("method()")
  46. public void doAfter() {
  47. DynamicDataSourceHolder.removeDataSource();
  48. }
  49. }

DataSourceConfig 配置

通过以下代码来将动态数据源配置到 SqlSession 中去

  1. /**
  2. * 数据源的一些配置,主要是配置读写分离的sqlsession,这里没有使用mybatis annotation
  3. *
  4. @Configuration
  5. @EnableTransactionManagement
  6. @EnableAspectJAutoProxy
  7. class DataSourceConfig {
  8. /** 可读写的SQL Session */
  9. public static final String BEANNAME_SQLSESSION_COMMON = "sqlsessionCommon";
  10. /** 事务管理器的名称,如果有多个事务管理器时,需要指定beanName */
  11. public static final String BEANNAME_TRANSACTION_MANAGER = "transactionManager";
  12. /** 主数据源,必须配置,spring启动时会执行初始化数据操作(无论是否真的需要),选择查找DataSource class类型的数据源 配置通用数据源,可读写,连接的是主库 */
  13. @Bean
  14. @Primary
  15. @ConfigurationProperties(prefix = "datasource.common")
  16. public DataSource datasourceCommon() {
  17. // 数据源配置 可更换为其他实现方式
  18. return DataSourceBuilder.create().build();
  19. }
  20. /**
  21. * 动态数据源
  22. * @returnr
  23. */
  24. @Bean
  25. public DynamicDataSource dynamicDataSource() {
  26. DynamicDataSource dynamicDataSource = new DynamicDataSource();
  27. LinkedHashMap<Object, Object> hashMap = Maps.newLinkedHashMap();
  28. hashMap.put(DSConst.默认, datasourceCommon());
  29. hashMap.put(DSConst.主库, datasourceCommon());
  30. hashMap.put(DSConst.从库, datasourceReadOnly());
  31. hashMap.put(DSConst.统计, datasourceStat());
  32. // 初始化数据源 Map
  33. dynamicDataSource.setTargetDataSources(hashMap);
  34. dynamicDataSource.setDefaultTargetDataSource(datasourceCommon());
  35. return dynamicDataSource;
  36. }
  37. /**
  38. * 配置事务管理器
  39. */
  40. @Primary
  41. @Bean(name = BEANNAME_TRANSACTION_MANAGER)
  42. public DataSourceTransactionManager createDataSourceTransactionManager2() {
  43. DataSource dataSource = this.dynamicDataSource();
  44. DataSourceTransactionManager manager = new DataSourceTransactionManager(dataSource);
  45. return manager;
  46. }
  47. /**
  48. * 配置读写sqlsession
  49. */
  50. @Primary
  51. @Bean(name = BEANNAME_SQLSESSION_COMMON)
  52. public SqlSession readWriteSqlSession() throws Exception {
  53. SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
  54. // 设置动态数据源
  55. factory.setDataSource(this.dynamicDataSource());
  56. PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
  57. factory.setConfigLocation(resolver.getResource("mybatis/mybatis-config.xml"));
  58. factory.setMapperLocations(resolver.getResources("mybatis/mappers/**/*.xml"));
  59. return new SqlSessionTemplate(factory.getObject());
  60. }
  61. }

总结

综上,利用AOP+注解实现了一个简单的Spring动态数据源功能,使用的时候,仅需要在目标方法上加上 @MyDS 注解即可。许多开源组件,会在现有的基础上增加一个扩展功能,比如路由策略等等。
顺便聊一下 sharding-jdbc 的实现方式,更新写入类sql自动走主库,查询类自动走读库,如果是新项目无历史债务的话,是可以使用该方案的。如果是在原有旧的项目上进行读写分离改造,那如果使用了 sharding-jdbc 读写分离方案,就必须梳理已有代码逻辑中的sql调用情况,来避免主从延迟造成数据不一致对业务的影响。
主从延迟造成读取数据不一致的情况是指:主从在同步的时候是有一定的延迟时间的,不管是什么网络的情况,这个延迟的值都是存在的,一般在毫秒级左右。这个时候如果使用sharding-jdbc进行读写分离处理,进行实时数据插入并查询判断的时候,就会出现判断异常的情况。