介绍

在Lcache组件中,对每个链接资源都通过ConnectResource进行管理

  1. public class ConnectResource implements InterfaceConnectResource {
  2. /**
  3. * 连接资源控制锁,当资源替换时,通过此锁控制
  4. */
  5. private StampedLock stampedLock = new StampedLock();
  6. private InterfaceConnectResource connectResource;
  7. public ConnectResource setConnectResource(InterfaceConnectResource connectResource) {
  8. this.connectResource = connectResource;
  9. return this;
  10. }
  11. @Override
  12. public void close() throws Exception {
  13. this.connectResource.close();
  14. }
  15. public StampedLock getStampedLock() {
  16. return stampedLock;
  17. }
  18. public InterfaceConnectResource getResource() {
  19. return this.connectResource;
  20. }
  21. }

每个ConnectResource中,都有一个StampedLock资源控制锁;
每个链接实现中,例如 AbstractLettuceHandleExecutor 的 getConnectResource 方法,在获取资源前,会先尝试拿到读锁,
�并判断是否需要加悲观读锁,并最后释放悲观读锁;

    /**
     * 获取资源
     *
     * @return
     */
    @Override
    public StatefulConnection getConnectResource() {
        StatefulConnection statefulConnection = null;
        long stamp = connectResource.getStampedLock().tryOptimisticRead();
        statefulConnection = this.getStatefulConnection();
        //判断是否需要加悲观读锁
        if (!connectResource.getStampedLock().validate(stamp)) {
            stamp = connectResource.getStampedLock().readLock();
            try {
                statefulConnection = this.getStatefulConnection();
            } finally {
                //释放读锁
                connectResource.getStampedLock().unlockRead(stamp);
            }
        }
        return statefulConnection;
    }

使用

在 Lettuce 默认后置处理 LettuceHandlesPostProcessor 中,判断如果是单连接方式并且发生了连接关闭错误,则通过RedisConnectionManager 进行连接重置

//Lettuce非连接池方式增加连接重置功能
        if (cacheHandleProcessorModel.getCacheConfigModel().getConnectTypeEnum().equals(ConnectTypeEnum.SIMPLE) && cacheHandleProcessorModel.getE().getMessage().contains(CONNECTION_CLOSE_ERROR)) {
            //重置连接
            try {
                synchronized (cacheHandleProcessorModel.getCacheConfigModel()) {
                    //获取配置
                    BaseCacheConfig config = CacheExecutorFactory.getRedisSourceConfig(cacheHandleProcessorModel.getCacheConfigModel());
                    //判断连接是否有效
                    LettuceConnectResource resource = (LettuceConnectResource) RedisConnectionManager.getConnectionResource(cacheHandleProcessorModel.getCacheConfigModel(), config).getResource();
                    if (!resource.getStatefulRedisConnection().isOpen()) {
                        //重置连接
                        RedisConnectionManager.resetConnectionResource(cacheHandleProcessorModel.getCacheConfigModel(), config);
                    }
                }
            } catch (Exception e) {
                CacheExceptionFactory.addErrorLog("LettuceHandlesPostProcessor reset connection fail !", e);
            }
        }

在30秒一次的心跳检测中,同样有此逻辑

public class HeartCheckScheduled extends CacheExecutorFactory {

    private static final String CONNECTION_CLOSE_ERROR = "Connection is closed";

    static {

        /**
         * 拿到所有连接,发送一条心跳命令,如果失败则重连
         * 30秒1次
         */
        AsyncExecutorUtils.submitScheduledTask(() -> {
            //获取当前配置
            for (BaseCacheExecutor baseCacheExecutor : executorMap.values()) {
                if (baseCacheExecutor.getCacheConfigModel().getUseType().equals(UseTypeEnum.PUBSUB)) {
                    break;
                }
                try {
                    baseCacheExecutor.setex("cache:heart", 60, "1");
                } catch (Exception e) {
                    CacheExceptionFactory.addWarnLog("HeartCheckScheduled", e, "心跳检测失败");
                    if (e.getMessage().contains(CONNECTION_CLOSE_ERROR)) {
                        //重新创建连接
                        try {
                            RedisConnectionManager.resetConnectionResource(baseCacheExecutor.getCacheConfigModel(), baseCacheExecutor.getRedisSourceConfig());
                        } catch (Exception e1) {
                            CacheExceptionFactory.addErrorLog("DbSourceScheduled", "dbConfigChange", "连接资源替换异常!", e1);
                        }
                    }
                }
            }
        }, CacheBasicConfig.HEART_CHECK_lINTERVAL_SECONDS, CacheBasicConfig.HEART_CHECK_lINTERVAL_SECONDS, TimeUnit.SECONDS);
    }
}

在DB与Apollo等配置中,可通过此方式实现动态配置

DB方式

        /**
         * 检测DB配置是否有变更
         * 如果有变更,则创建新的连接替换原有的
         * 间隔执行10秒
         */
        AsyncExecutorUtils.submitScheduledTask(() -> {
            //获取当前配置
            CacheConfigModel cacheConfigModel = null;
            String hashKey = null;
            RedisDbConfigModel oldConfig = null;
            RedisDbConfigModel newConfig = null;
            for (String s : redisDbConfigModelMap.keySet()) {
                try {
                    cacheConfigModel = CacheConfigUtils.hashKeyToModel(s);
                    hashKey = CacheConfigUtils.modelToHashKeyNoUseType(cacheConfigModel);
                    if (cacheConfigModel.getConfigSourceType() == CacheConfigSourceTypeEnum.DB) {
                        //获取DB配置,对比是否变更,如果变更
                        oldConfig = redisDbConfigModelMap.get(hashKey);
                        newConfig = getRedisDbConfigModelByDb(cacheConfigModel, false);
                        if (CacheConfigUtils.checkDbConfigIsChange(oldConfig, newConfig)) {
                            LOGGER.info("DbSourceScheduled config change ! oldConfig:[{}],newConfig:[{}]", JSON.toJSONString(oldConfig), JSON.toJSONString(newConfig));
                            //清理旧配置
                            redisDbConfigModelMap.remove(hashKey);
                            //替换连接
                            RedisConnectionManager.resetConnectionResource(cacheConfigModel, this.getConfig(cacheConfigModel));
                            //删除Redisson连接
                            RedissonClientManager.close(cacheConfigModel);
                            //重置配置
                            redisDbConfigModelMap.put(hashKey,newConfig);
                        }
                    }
                } catch (Exception e) {
                    CacheExceptionFactory.addErrorLog("DbSourceScheduled", "dbConfigChange", "连接资源替换异常!", e);
                }
            }
        }, CacheBasicConfig.DB_CONFIG_CHECK_lINTERVAL_SECONDS, CacheBasicConfig.DB_CONFIG_CHECK_lINTERVAL_SECONDS, TimeUnit.SECONDS);

Apollo方式

注:Apollo使用已被封装过

// 实时监听
config.addChangeListener(changeEvent -> {
    if (changeEvent.getNamespace().equals(getApolloNamespace(cacheConfigModel))) {
        if (changeEvent.isChanged(APOLLO_CONFIG_KEY)) {
            LOGGER.info("ApolloCacheConfigManager->apolloConfigListener change ! changeEvent:[{}]", JSON.toJSONString(changeEvent));
            //连接重置
            RedisConnectionManager.resetConnectionResource(cacheConfigModel, this.getConfigByCacheModel(cacheConfigModel));
        }
    }
});