CentralDogma

配置中心,也可以是别的比如库等,核心用于存储数据源配置信息,也可以结合openmetadata进行资产管理

  1. <dependency>
  2. <groupId>com.linecorp.centraldogma</groupId>
  3. <artifactId>centraldogma-client-spring-boot-starter</artifactId>
  4. <version>0.51.1</version>
  5. </dependency>
  1. centraldogma:
  2. enable: true
  3. hosts:
  4. - localhost:36462
  5. access-token: appToken-d
  6. project: datasource
  7. repo: dev
  8. filename: /conf.json
  1. [
  2. {
  3. "id": "61e9107e283fcc36e66213a1",
  4. "alias": "mysql",
  5. "url": "",
  6. "username": "root",
  7. "password": "123456",
  8. "supportPool": true,
  9. "maximumPoolSize": 10,
  10. "maxLifetime": 2,
  11. "driver": "com.mysql.jdbc.Driver",
  12. "connectionTestQuery": "select 1"
  13. }
  14. ]

MyDataSource

成员变量 dataSourceManager 和 dataSource, 前者为配置中心的配置数据,后者用于DataSource接口中声明的方法.

  1. public class MyDataSource implements DataSource {
  2. @Setter
  3. private DataSourceManager dataSourceManager;
  4. private DataSource dataSource;
  5. @SneakyThrows
  6. protected void init() {
  7. if (dataSourceManager.getSupportPool()) {
  8. HikariConfig config = new HikariConfig();
  9. config.setUsername(dataSourceManager.getUsername());
  10. config.setJdbcUrl(dataSourceManager.getUrl());
  11. config.setPassword(dataSourceManager.getPassword());
  12. config.setMaximumPoolSize(dataSourceManager.getMaximumPoolSize());
  13. config.setMaxLifetime(dataSourceManager.getMaxLifetime());
  14. config.setDriverClassName(dataSourceManager.getDriver());
  15. config.setConnectionTestQuery(dataSourceManager.getConnectionTestQuery());
  16. this.dataSource = new HikariDataSource(config);
  17. } else {
  18. SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
  19. dataSource.setUsername(dataSourceManager.getUsername());
  20. dataSource.setPassword(dataSourceManager.getPassword());
  21. dataSource.setUrl(dataSourceManager.getUrl());
  22. dataSource.setDriverClass((Class<? extends Driver>) Class.forName(dataSourceManager.getDriver()));
  23. this.dataSource = dataSource;
  24. }
  25. }
  26. @Override
  27. public Connection getConnection() throws SQLException {
  28. return dataSource.getConnection();
  29. }
  30. @Override
  31. public Connection getConnection(String s, String s1) throws SQLException {
  32. return dataSource.getConnection();
  33. }
  34. @Override
  35. public <T> T unwrap(Class<T> aClass) throws SQLException {
  36. return dataSource.unwrap(aClass);
  37. }
  38. @Override
  39. public boolean isWrapperFor(Class<?> aClass) throws SQLException {
  40. return dataSource.isWrapperFor(aClass);
  41. }
  42. @Override
  43. public PrintWriter getLogWriter() throws SQLException {
  44. return dataSource.getLogWriter();
  45. }
  46. @Override
  47. public void setLogWriter(PrintWriter printWriter) throws SQLException {
  48. dataSource.setLogWriter(printWriter);
  49. }
  50. @Override
  51. public int getLoginTimeout() throws SQLException {
  52. return dataSource.getLoginTimeout();
  53. }
  54. @Override
  55. public void setLoginTimeout(int i) throws SQLException {
  56. dataSource.setLoginTimeout(dataSource.getLoginTimeout());
  57. }
  58. @Override
  59. public Logger getParentLogger() throws SQLFeatureNotSupportedException {
  60. return dataSource.getParentLogger();
  61. }
  62. }

CommandLineRunner

  1. 监听配置中心传来的数据
  2. 循环注册bean
     @Bean
     @ConditionalOnProperty(prefix = "centraldogma", value = "enable", havingValue = "true")
     @ConditionalOnMissingBean
     public CommandLineRunner dataSourceCommandLineRunner(CentralDogma dogma, ObjectMapper objectMapper, ConfigurableApplicationContext applicationContext) {
         Watcher watcher = dogma.fileWatcher(configuration.getProject(), configuration.getRepo(), Query.ofText(configuration.getFilename()));
         watcher.watch((revision, value) -> {
             log.info("Updated to {} at {}", value, revision);
             try {
                 BeanDefinitionRegistry beanFactory = (BeanDefinitionRegistry) applicationContext.getBeanFactory();
                 List<DataSourceManager> all = objectMapper.readValue(value.toString(), new TypeReference<List<DataSourceManager>>() {
                 });
                 for (DataSourceManager dataSourceManager : all) {
                    if (beanFactory.containsBeanDefinition(dataSourceManager.getId())) {
                         log.info("datasource exist id:{},name:{}", dataSourceManager.getId(), dataSourceManager.getAlias());
                         beanFactory.removeBeanDefinition(dataSourceManager.getId());
                         log.info("old datasource remove id:{},name:{}", dataSourceManager.getId(), dataSourceManager.getAlias());
                     }
                     BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(OneserviceDataSource.class);
                     beanDefinitionBuilder.addPropertyValue("dataSourceManager", dataSourceManager);// setter 注入
                     beanDefinitionBuilder.setInitMethodName("init");// init 方法初始化 私有变量
                     beanFactory.registerBeanDefinition(dataSourceManager.getId(), beanDefinitionBuilder.getBeanDefinition());
                     log.info("datasource register success  id :{},alias:{}", dataSourceManager.getId(), dataSourceManager.getAlias());
                 }
             } catch (JsonProcessingException e) {
                 e.printStackTrace();
                 log.error("json convert  error", e);
             }
         });
         return args -> {
         };
     }
    

    注入

    由于监听是异步操作 ,故bean应该是懒加载
     @Autowired
     @Lazy
     @Qualifier("61e9107e283fcc36e66213a1")
     private DataSource dataSource;