介绍
在Lcache组件中,对每个链接资源都通过ConnectResource进行管理
public class ConnectResource implements InterfaceConnectResource {
/**
* 连接资源控制锁,当资源替换时,通过此锁控制
*/
private StampedLock stampedLock = new StampedLock();
private InterfaceConnectResource connectResource;
public ConnectResource setConnectResource(InterfaceConnectResource connectResource) {
this.connectResource = connectResource;
return this;
}
@Override
public void close() throws Exception {
this.connectResource.close();
}
public StampedLock getStampedLock() {
return stampedLock;
}
public InterfaceConnectResource getResource() {
return this.connectResource;
}
}
每个ConnectResource中,都有一个StampedLock资源控制锁;
每个链接实现中,例如 AbstractLettuceHandleExecutor 的 getConnectResource 方法,在获取资源前,会先尝试拿到读锁,
�并判断是否需要加悲观读锁,并最后释放悲观读锁;
/**
* 获取资源
*
* @return
*/
@Override
public StatefulConnection getConnectResource() {
StatefulConnection statefulConnection = null;
long stamp = connectResource.getStampedLock().tryOptimisticRead();
statefulConnection = this.getStatefulConnection();
//判断是否需要加悲观读锁
if (!connectResource.getStampedLock().validate(stamp)) {
stamp = connectResource.getStampedLock().readLock();
try {
statefulConnection = this.getStatefulConnection();
} finally {
//释放读锁
connectResource.getStampedLock().unlockRead(stamp);
}
}
return statefulConnection;
}
使用
在 Lettuce 默认后置处理 LettuceHandlesPostProcessor 中,判断如果是单连接方式并且发生了连接关闭错误,则通过RedisConnectionManager 进行连接重置
//Lettuce非连接池方式增加连接重置功能
if (cacheHandleProcessorModel.getCacheConfigModel().getConnectTypeEnum().equals(ConnectTypeEnum.SIMPLE) && cacheHandleProcessorModel.getE().getMessage().contains(CONNECTION_CLOSE_ERROR)) {
//重置连接
try {
synchronized (cacheHandleProcessorModel.getCacheConfigModel()) {
//获取配置
BaseCacheConfig config = CacheExecutorFactory.getRedisSourceConfig(cacheHandleProcessorModel.getCacheConfigModel());
//判断连接是否有效
LettuceConnectResource resource = (LettuceConnectResource) RedisConnectionManager.getConnectionResource(cacheHandleProcessorModel.getCacheConfigModel(), config).getResource();
if (!resource.getStatefulRedisConnection().isOpen()) {
//重置连接
RedisConnectionManager.resetConnectionResource(cacheHandleProcessorModel.getCacheConfigModel(), config);
}
}
} catch (Exception e) {
CacheExceptionFactory.addErrorLog("LettuceHandlesPostProcessor reset connection fail !", e);
}
}
在30秒一次的心跳检测中,同样有此逻辑
public class HeartCheckScheduled extends CacheExecutorFactory {
private static final String CONNECTION_CLOSE_ERROR = "Connection is closed";
static {
/**
* 拿到所有连接,发送一条心跳命令,如果失败则重连
* 30秒1次
*/
AsyncExecutorUtils.submitScheduledTask(() -> {
//获取当前配置
for (BaseCacheExecutor baseCacheExecutor : executorMap.values()) {
if (baseCacheExecutor.getCacheConfigModel().getUseType().equals(UseTypeEnum.PUBSUB)) {
break;
}
try {
baseCacheExecutor.setex("cache:heart", 60, "1");
} catch (Exception e) {
CacheExceptionFactory.addWarnLog("HeartCheckScheduled", e, "心跳检测失败");
if (e.getMessage().contains(CONNECTION_CLOSE_ERROR)) {
//重新创建连接
try {
RedisConnectionManager.resetConnectionResource(baseCacheExecutor.getCacheConfigModel(), baseCacheExecutor.getRedisSourceConfig());
} catch (Exception e1) {
CacheExceptionFactory.addErrorLog("DbSourceScheduled", "dbConfigChange", "连接资源替换异常!", e1);
}
}
}
}
}, CacheBasicConfig.HEART_CHECK_lINTERVAL_SECONDS, CacheBasicConfig.HEART_CHECK_lINTERVAL_SECONDS, TimeUnit.SECONDS);
}
}
DB方式
/**
* 检测DB配置是否有变更
* 如果有变更,则创建新的连接替换原有的
* 间隔执行10秒
*/
AsyncExecutorUtils.submitScheduledTask(() -> {
//获取当前配置
CacheConfigModel cacheConfigModel = null;
String hashKey = null;
RedisDbConfigModel oldConfig = null;
RedisDbConfigModel newConfig = null;
for (String s : redisDbConfigModelMap.keySet()) {
try {
cacheConfigModel = CacheConfigUtils.hashKeyToModel(s);
hashKey = CacheConfigUtils.modelToHashKeyNoUseType(cacheConfigModel);
if (cacheConfigModel.getConfigSourceType() == CacheConfigSourceTypeEnum.DB) {
//获取DB配置,对比是否变更,如果变更
oldConfig = redisDbConfigModelMap.get(hashKey);
newConfig = getRedisDbConfigModelByDb(cacheConfigModel, false);
if (CacheConfigUtils.checkDbConfigIsChange(oldConfig, newConfig)) {
LOGGER.info("DbSourceScheduled config change ! oldConfig:[{}],newConfig:[{}]", JSON.toJSONString(oldConfig), JSON.toJSONString(newConfig));
//清理旧配置
redisDbConfigModelMap.remove(hashKey);
//替换连接
RedisConnectionManager.resetConnectionResource(cacheConfigModel, this.getConfig(cacheConfigModel));
//删除Redisson连接
RedissonClientManager.close(cacheConfigModel);
//重置配置
redisDbConfigModelMap.put(hashKey,newConfig);
}
}
} catch (Exception e) {
CacheExceptionFactory.addErrorLog("DbSourceScheduled", "dbConfigChange", "连接资源替换异常!", e);
}
}
}, CacheBasicConfig.DB_CONFIG_CHECK_lINTERVAL_SECONDS, CacheBasicConfig.DB_CONFIG_CHECK_lINTERVAL_SECONDS, TimeUnit.SECONDS);
Apollo方式
注:Apollo使用已被封装过
// 实时监听
config.addChangeListener(changeEvent -> {
if (changeEvent.getNamespace().equals(getApolloNamespace(cacheConfigModel))) {
if (changeEvent.isChanged(APOLLO_CONFIG_KEY)) {
LOGGER.info("ApolloCacheConfigManager->apolloConfigListener change ! changeEvent:[{}]", JSON.toJSONString(changeEvent));
//连接重置
RedisConnectionManager.resetConnectionResource(cacheConfigModel, this.getConfigByCacheModel(cacheConfigModel));
}
}
});