前言
在《Cache组件-热key收集》中,我们已经介绍过热key是什么,热key的危害,如何收集统计;
最后引出了一些热key防护的方案,本文为多级缓存解决方案。
适用场景
- 长期不修改,如配置类数据
- 查询流量大,如秒杀商品信息等
- 非强一致性,允许分布式情况下的修改与查询结果不匹配(毫秒级)
本地缓存选型
| 开源组件 | 优点 | 缺点 | | —- | —- | —- | | EHCache | Encache是一个纯Java的进程内缓存框架,是Hibernate中默认de CacheProvider。同Caffeine和Guava Cache相比,Encache的功能更加丰富,扩展性更强,支持多种缓存淘汰算法,包括LRU、LFU和FIFO,缓存支持堆内存储、堆外存储、磁盘存储(支持持久化)三种,支持多种集群方案,解决数据共享问题。 | Encache在性能上不及Caffeine和Guava Cache | | Guava Cache | Guava Cache继承了ConcurrentHashMap的思路,使用多个segments方式的细粒度锁,在保证线程安全的同时,支持高并发场景需求。 | 无明显缺点,性能不及Caffeine | | Caffeine | Caffeine内部采用了一种结合LRU、LFU优点的算法:W-TinyLFU,在性能上比Guava cache更加优秀,Caffeine的API基本和Guava cache一样。 | 无明显缺点 |
常见的多级缓存方案
角色
- pod
- worker
-
流程
POD集群服务与worker集群启动时,将自身信息注册到注册中心;
- POD集群服务与worker集群直连;
- RedisClient客户端收集redis热key统计信息,上报给worker;
- worker对热key进行分析,如果触发阈值,通知所有POD,将热key进行本地缓存;
-
优点
能解决热key问题,并能第一时间通知所有POD进行本地缓存;
缺点
Cache组件方案
1、整体设计
优点
- 组件基于Redis的发布订阅实现分布式缓存的通知,降低整体复杂度;
-
缺点
将高可用完全寄托于Redis,如果redis异常,则依赖本地缓存的淘汰策略,因此超时时间不能设置太长;
-
2、组件详细设计
3、介绍
命令生命周期
通过CacheExecutor执行器调用命令,执行器通过handle调度器调用,handle调度器将命令通过本地缓存工厂 RedisLocalCacheFactory包装重写;
重写后的命令,优先判断key是否需要本地缓存,然后根据数据类型(目前只支持String、Hash、Set)选择本地缓存实现类,再根据本地缓存执行策略调用:
- NONE:直接调用命令的apply进行redis操作并返回
- GET:通过本地缓存执行器执行get操作,判断是否需要通知其他POD,需要则通过发布订阅通知;
- SET:通过本地缓存执行器执行set操作,并通知其他POD;
4、具体代码
工厂 RedisLocalCacheFactory ```java /**
- @author JerryLong
- @version V1.0
- @Title: RedisLocalCacheFactory
- @Description: 本地缓存工厂
- @date 2021/8/24 5:20 PM / public class RedisLocalCacheFactory { /*
- 支持本地缓存设置最大数量 / private static final int KEY_MAX_SIZE = 200; /*
- 短暂的本地缓存key,只支持5.1秒 / private static final int LOCAL_TRANSIENCE_KEY_MILLISECONDS = 5100; /*
- 需要长期本地缓存的key列表
/
private static Map
> longTimesKeys = new ConcurrentHashMap<>(); /* - 短暂本地缓存的key前缀 / private static final String TRANSIENCE_KEY_PREFIX = “hot-key-“; /*
- 数值1,需要推送 / public static final int KEY_STATUS_NEED_PUBLISH = 1; /*
- 数值2,已经推送 / public static final int KEY_STATUS_ALREADY_PUBLISH = 2; /*
- 热key本地缓存
*/
private static Cache
transienceLocalKeys = Caffeine.newBuilder() //设置cache的初始大小为10,要合理设置该值 .initialCapacity(10) //最大值 .maximumSize(2000) //在写入后开始计时,在指定的时间后过期。 .expireAfterWrite(LOCAL_TRANSIENCE_KEY_MILLISECONDS, TimeUnit.MILLISECONDS) //构建cache实例 .build();
private static Map
localCacheHandleMap = new ConcurrentHashMap<>(); private static AbstractLocalCacheHandle defaultHandle = new DefaultLocalCacheHandle();
static { localCacheHandleMap.put(CommandsDataTypeEnum.STRING, new StringLocalCacheHandle()); localCacheHandleMap.put(CommandsDataTypeEnum.SET, new SetLocalCacheHandle()); localCacheHandleMap.put(CommandsDataTypeEnum.HASH, new HashLocalCacheHandle()); localCacheHandleMap.put(CommandsDataTypeEnum.LIST, new ListLocalCacheHandle()); localCacheHandleMap.put(CommandsDataTypeEnum.ZSET, new ZsetLocalCacheHandle()); }
public static void registe(CommandsDataTypeEnum dataType, AbstractLocalCacheHandle handle) { localCacheHandleMap.put(dataType, handle); }
/**
- 本地缓存key发布订阅channel */ public static final String LOCAL_CACHE_KEY_PUBSUB_CHANNEL = “Lcache:local:channel:”;
/**
- 获取本地缓存执行器 *
- @param dataType
- @param executor
- @return */ public static AbstractLocalCacheHandle getLocalCacheHandle(CommandsDataTypeEnum dataType, BaseCacheExecutor executor) { return localCacheHandleMap.getOrDefault(dataType, defaultHandle); }
/**
- 添加本地缓存的key *
- @param key
*/
public static void addLocalCacheKey(BaseCacheExecutor executor, String key) {
Set
sets = longTimesKeys.computeIfAbsent(CacheConfigUtils.modelToHashKey(executor.getCacheConfigModel()), e -> new HashSet<>()); CacheExceptionFactory.throwException(sets.size() + 1 < KEY_MAX_SIZE, “本地缓存的key数量超出” + KEY_MAX_SIZE); sets.add(key); }
/**
* 添加本地缓存的key
*
* @param keys
*/
public static void addLocalCacheKey(BaseCacheExecutor executor, Set<String> keys) {
Set<String> sets = longTimesKeys.computeIfAbsent(CacheConfigUtils.modelToHashKey(executor.getCacheConfigModel()), e -> new HashSet<>());
CacheExceptionFactory.throwException(sets.size() + keys.size() < KEY_MAX_SIZE, "本地缓存的key数量超出" + KEY_MAX_SIZE);
sets.addAll(keys);
}
/**
* 是否是本地缓存的key
*
* @param key
* @return 1 需要推送,2 不需要推送
*/
public static int isLocalCacheKey(BaseCacheExecutor executor, String key) {
if (StringUtils.isBlank(key)) {
return -1;
}
Set<String> sets = longTimesKeys.computeIfAbsent(CacheConfigUtils.modelToHashKey(executor.getCacheConfigModel()), e -> new HashSet<>());
if (sets.contains(key)) {
return KEY_STATUS_ALREADY_PUBLISH;
}
return (int) transienceLocalKeys.get(TRANSIENCE_KEY_PREFIX + key, e -> -1);
}
/**
* 提交一个短暂本地缓存的key
*
* @param key
* @param isNeedPublish 是否需要推送
*/
public static void addTransienceLocalCacheKey(String key, boolean isNeedPublish) {
transienceLocalKeys.put(TRANSIENCE_KEY_PREFIX + key, isNeedPublish ? KEY_STATUS_NEED_PUBLISH : KEY_STATUS_ALREADY_PUBLISH);
}
/**
* 获取命令与key本地缓存执行策略
*
* @param commands
* @param key
* @return
*/
public static RedisLocalCachePubHandle getRedisLocalCachePubHandle(BaseCacheExecutor executor, String commands, String key) {
int isNeedPublish = isLocalCacheKey(executor, key);
//不需要本地缓存的key,直接返回none
if (isNeedPublish < 0) {
return null;
}
return new RedisLocalCachePubHandle(isNeedPublish, CommandsDataTypeUtil.getHotKeyHandleType(commands));
}
/**
* 本地缓存通知处理方式
*/
public static class RedisLocalCachePubHandle {
public RedisLocalCachePubHandle() {
}
public RedisLocalCachePubHandle(int isNeedPublish, LocalCacheHandleTypeEnum handleType) {
this.isNeedPublish = isNeedPublish;
this.handleType = handleType;
}
private int isNeedPublish;
private LocalCacheHandleTypeEnum handleType;
public int getIsNeedPublish() {
return isNeedPublish;
}
public LocalCacheHandleTypeEnum getHandleType() {
return handleType;
}
}
}
**本地缓存执行器抽象 AbstractLocalCacheHandle**
```java
/**
* @author JerryLong
* @version V1.0
* @Title: AbstractLocalCacheHandle
* @Description: 本地缓存执行器抽象类
* @date 2021/11/9 2:34 PM
*/
public abstract class AbstractLocalCacheHandle {
private static Cache<String, LocalCacheLifeCycle> LOCAL_CACHE = Caffeine.newBuilder()
//设置cache的初始大小为10,要合理设置该值
.initialCapacity(100)
//最大值
.maximumSize(10000)
//在写入后开始计时,在指定的时间后过期。保留500毫秒
.expireAfterWrite(500, TimeUnit.MILLISECONDS)
//构建cache实例
.build();
/**
* 获取,如果存在
*
* @param key
* @return
*/
public LocalCacheLifeCycle getIfPresent(BaseCacheExecutor executor, String key) {
return LOCAL_CACHE.getIfPresent(getLocalKey(executor, key));
}
/**
* 删除
*
* @param key
*/
public static void del(String key) {
LOCAL_CACHE.invalidate(key);
}
public static void del(BaseCacheExecutor executor, String key) {
LOCAL_CACHE.invalidate(getLocalKey(executor.getCacheConfigModel(), key));
}
/**
* 实现类需要指定自身数据类型
*
* @return
*/
protected abstract CommandsDataTypeEnum getDataType();
@PostConstruct
public void regist() {
RedisLocalCacheFactory.registe(getDataType(), this);
}
/**
* 获取
*
* @param function
* @param key
* @param fields
* @return
*/
protected abstract Object get(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields);
protected Object set(BaseCacheExecutor executor, CacheFunction function, String key) {
try {
return function.apply();
} finally {
//删除本地缓存
LOCAL_CACHE.invalidate(key);
}
}
/**
* 缓存方法执行
*
* @param function
* @param key
* @param fields
* @return
*/
public Object doCacheFunc(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields) {
if (executor.getCacheConfigModel().isLocalCache()) {
//这一次命令是否不走本地缓存
if (Boolean.TRUE.equals(executor.getNoLocalCacheOnce())) {
try {
return function.apply();
} finally {
executor.removeNoLocalCacheOnce();
}
}
RedisLocalCacheFactory.RedisLocalCachePubHandle localCacheHandle = RedisLocalCacheFactory.getRedisLocalCachePubHandle(executor, function.fnToFnName(), key);
if (null != localCacheHandle) {
Object res = null;
switch (localCacheHandle.getHandleType()) {
case NONE:
res = function.apply();
break;
case GET:
res = this.get(executor, function, key, fields);
//判断是否需要通知其他pod
if (localCacheHandle.getIsNeedPublish() == RedisLocalCacheFactory.KEY_STATUS_NEED_PUBLISH) {
//修改状态
RedisLocalCacheFactory.addTransienceLocalCacheKey(key, false);
LocalCachePublisher.publish(executor, key, true);
}
break;
case SET:
res = this.set(executor, function, getLocalKey(executor, key));
//通知其他pod,key是拼接后的
LocalCachePublisher.publish(executor, getLocalKey(executor, key), false);
break;
default:
res = function.apply();
break;
}
return res;
}
}
return function.apply();
}
/**
* 获取本地缓存的key
*
* @param key
* @return
*/
private String getLocalKey(BaseCacheExecutor executor, String key) {
return getLocalKey(executor.getCacheConfigModel(), key);
}
protected static String getLocalKey(CacheConfigModel cacheConfigModel, String key) {
return "l" + CacheConfigUtils.modelToHashKey(cacheConfigModel) + key;
}
/**
* 获取cache
*
* @param key
* @param check
* @param loadData
* @param <T>
* @return
*/
protected <T extends Object> Optional<LocalCacheLifeCycle<T>> getLocalCache(BaseCacheExecutor executor, String key, Function check, Function loadData) {
key = getLocalKey(executor, key);
@Nullable LocalCacheLifeCycle data = LOCAL_CACHE.getIfPresent(key);
if (null == data) {
loadAsync(key, check, loadData);
}
return Optional.ofNullable(data);
}
/**
* 异步加载,如果在加载期间有修改,则放弃
*
* @param key
*/
protected static void loadAsync(String key, Function checkLength, Function loadData) {
CompletableFuture.runAsync(() -> {
//判断是否能被加载
LocalCacheLifeCycle cycle = LOCAL_CACHE.getIfPresent(key);
if (!isCanLoad(cycle)) {
return;
}
//判断长度
if (null != checkLength) {
if (!(boolean) checkLength.apply(null)) {
LOCAL_CACHE.put(key, new LocalCacheLifeCycle(LocalCacheStatus.CAN_NOT, null));
return;
}
}
Object data = loadData.apply(null);
//最后再判断一遍是否能被加载,并且判断版本号是否是最新的
LocalCacheLifeCycle cycle1 = LOCAL_CACHE.getIfPresent(key);
if (isCanLoad(cycle1)) {
//判断版本号
if (null != cycle && null != cycle1 && cycle1.getTimestamp() > cycle.getTimestamp()) {
return;
}
//缓存
LOCAL_CACHE.put(key, new LocalCacheLifeCycle(LocalCacheStatus.CACHE, data));
}
});
}
/**
* 是否可以被重新加载,
* true : 初始化Null、已删除、已缓存
*
* @param cycle
* @return
*/
private static boolean isCanLoad(LocalCacheLifeCycle cycle) {
return null == cycle || cycle.getLocalCacheStatus().equals(LocalCacheStatus.CACHE) || cycle.getLocalCacheStatus().equals(LocalCacheStatus.DEL);
}
/**
* 本地缓存生命周期
*/
public static class LocalCacheLifeCycle<T> {
public LocalCacheLifeCycle() {
}
public LocalCacheLifeCycle(LocalCacheStatus localCacheStatus, T data) {
this.localCacheStatus = localCacheStatus;
this.timestamp = System.currentTimeMillis();
this.data = data;
}
/**
* 缓存状态
*/
private LocalCacheStatus localCacheStatus;
/**
* 版本号,毫秒时间戳
*/
private Long timestamp;
/**
* 缓存数据
*/
private T data;
public LocalCacheStatus getLocalCacheStatus() {
return localCacheStatus;
}
public Long getTimestamp() {
return timestamp;
}
public T getData() {
return data;
}
}
/**
* key缓存状态
*/
protected enum LocalCacheStatus {
/**
* 不能本地缓存
*/
CAN_NOT,
/**
* 删除
*/
DEL,
/**
* 加载中
*/
LOADING,
/**
* 缓存中
*/
CACHE;
}
}
本地缓存热key发布 LocalCachePublisher
/**
* @author JerryLong
* @version V1.0
* @Title: LocalCachePublisher
* @Description: 热key消息发送方
* @date 2021/8/24 7:06 PM
*/
public class LocalCachePublisher {
/**
* 通过pubsub通知其他节点
*
* @param executor
* @param key
* @param isNewKey true是新的key,false是修改的key
*/
public static void publish(BaseCacheExecutor executor, String key, boolean isNewKey) {
if (!MonitorFactory.isOpenHotKeyLocalCache()) {
return;
}
if (null == executor) {
return;
}
//通知其他pod
executor.publishAsync(RedisLocalCacheFactory.LOCAL_CACHE_KEY_PUBSUB_CHANNEL + executor.getCacheConfigModel().getCacheType(), JSON.toJSONString(new HotKeySubscriptData(key, isNewKey ? -1 : 1)));
}
}
本地缓存热key消费 LocalCacheSubscriber
/**
* @author JerryLong
* @version V1.0
* @Title: LocalCacheSubscriber
* @Description: 热key消息监听方
* @date 2021/8/24 7:05 PM
*/
public class LocalCacheSubscriber {
private static Map<String, Integer> subscribeInfo = new ConcurrentHashMap<>();
/**
* 增加监听
*
* @param executor
*/
public static void addSubscriber(BaseCacheExecutor executor) {
subscribeInfo.computeIfAbsent(CacheConfigUtils.modelToHashKeyNoUseType(executor.getCacheConfigModel()), e -> {
executor.subscribe((message) -> manageMessage(JSON.parseObject(message, HotKeySubscriptData.class)), RedisLocalCacheFactory.LOCAL_CACHE_KEY_PUBSUB_CHANNEL + executor.getCacheConfigModel().getCacheType());
return 1;
});
}
/**
* 管理消息
*
* @param hotKeySubscriptData
*/
public static void manageMessage(HotKeySubscriptData hotKeySubscriptData) {
if (!MonitorFactory.isOpenHotKeyLocalCache()) {
return;
}
//如果是当前pod发的消息,不处理
if (hotKeySubscriptData.isLocalHost()) {
return;
}
if (hotKeySubscriptData.isNewKey()) {
//新的key,本地缓存中注册这个key
RedisLocalCacheFactory.addTransienceLocalCacheKey(hotKeySubscriptData.getKey(), false);
} else {
//修改,需要删除本地缓存
AbstractLocalCacheHandle.del(hotKeySubscriptData.getKey());
}
}
}
本地缓存处理器实现(String、Hash、Set)
public class StringLocalCacheHandle extends AbstractLocalCacheHandle {
@Override
protected CommandsDataTypeEnum getDataType() {
return CommandsDataTypeEnum.STRING;
}
@Override
public Object get(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields) {
return this.getLocalCache(executor, key, null, e -> executor.noLocalCacheOnce().get(key)).map(
e -> {
//判断缓存状态
if (!e.getLocalCacheStatus().equals(LocalCacheStatus.CACHE)) {
return function.apply();
}
switch (function.fnToFnName()) {
case "get":
return e.getData();
case "substr":
if (null == e.getData()) {
return "";
}
return substr(e.getData().toString(), Integer.parseInt(fields[0].toString()), Integer.parseInt(fields[1].toString()));
case "strlen":
if (null == e.getData()) {
return 0L;
}
return Long.parseLong(String.valueOf(e.getData().toString().length()));
default:
return function.apply();
}
}
).orElse(function.apply());
}
public static String substr(String v, int s, int e) {
if (StringUtils.isBlank(v)) {
return "";
}
return v.substring(s, javaEnd(v.length(), e));
}
public static int javaEnd(int length, int end) {
if (0 < end) {
int l = end + 1;
return l > length ? length : l;
} else if (0 == end) {
return 1;
} else {
int l = length + 1 + end;
return l < 1 ? 1 : l;
}
}
}
public class HashLocalCacheHandle extends AbstractLocalCacheHandle {
@Override
protected CommandsDataTypeEnum getDataType() {
return CommandsDataTypeEnum.HASH;
}
private static final Integer HASH_MAX_LEN = 5000;
/**
* 如果key不存在,需要先load
*
* @param function
* @param fields
* @return
*/
@Override
public Object get(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields) {
return this.getCacheMap(executor, key).map(e -> {
//判断缓存状态
if (!e.getLocalCacheStatus().equals(LocalCacheStatus.CACHE)) {
return function.apply();
}
switch (function.fnToFnName()) {
case "hget":
return e.getData().get(fields[0]);
case "hmget":
List<Object> list = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
list.add(e.getData().get(fields[i]));
}
return list;
case "hmgetToMap":
Map<String, Object> map = new HashMap<>(8);
Object o = null;
for (int i = 0; i < fields.length; i++) {
o = e.getData().get(fields[i]);
if (null != o) {
map.put(fields[i].toString(), o);
}
}
return map;
case "hmgetToMapCanNull":
Map<String, Object> map1 = new HashMap<>(8);
for (int i = 0; i < fields.length; i++) {
map1.put(fields[i].toString(), e.getData().get(fields[i]));
}
return map1;
case "hgetAll":
return e.getData();
case "hkeys":
return e.getData().keySet();
case "hlen":
return Long.valueOf(e.getData().size());
case "hvals":
return new ArrayList<>(e.getData().values());
case "hexists":
return e.getData().containsKey(fields[0]);
case "hstrlen":
return e.getData().get(fields[0]).toString().length();
default:
return function.apply();
}
}).orElse(function.apply());
}
/**
* 获取cache
*
* @param key
* @return
*/
private Optional<LocalCacheLifeCycle<Map<String, Object>>> getCacheMap(BaseCacheExecutor executor, String key) {
return getLocalCache(executor, key, e -> executor.noLocalCacheOnce().hlen(key) <= HASH_MAX_LEN, e -> executor.noLocalCacheOnce().hgetAll(key));
}
}
public class SetLocalCacheHandle extends AbstractLocalCacheHandle {
@Override
protected CommandsDataTypeEnum getDataType() {
return CommandsDataTypeEnum.SET;
}
private static final Integer SET_MAX_LEN = 5000;
/**
* 如果key不存在,需要先load
*
* @param function
* @param fields
* @return
*/
@Override
public Object get(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields) {
return this.getCacheMap(executor, key).map(e -> {
//判断缓存状态
if (!e.getLocalCacheStatus().equals(LocalCacheStatus.CACHE)) {
return function.apply();
}
switch (function.fnToFnName()) {
case "sismember":
return e.getData().contains(fields[0]);
case "smembers":
return e.getData();
case "scard":
return Long.valueOf(e.getData().size());
default:
return function.apply();
}
}).orElse(function.apply());
}
/**
* 获取cache
*
* @param key
* @return
*/
private Optional<LocalCacheLifeCycle<Set<Object>>> getCacheMap(BaseCacheExecutor executor, String key) {
return getLocalCache(executor, key, e -> executor.noLocalCacheOnce().scard(key) <= SET_MAX_LEN, e -> executor.noLocalCacheOnce().smembers(key));
}
}