前言

在《Cache组件-热key收集》中,我们已经介绍过热key是什么,热key的危害,如何收集统计;
最后引出了一些热key防护的方案,本文为多级缓存解决方案。

适用场景

  1. 长期不修改,如配置类数据
  2. 查询流量大,如秒杀商品信息等
  3. 非强一致性,允许分布式情况下的修改与查询结果不匹配(毫秒级)

    本地缓存选型

    | 开源组件 | 优点 | 缺点 | | —- | —- | —- | | EHCache | Encache是一个纯Java的进程内缓存框架,是Hibernate中默认de CacheProvider。同Caffeine和Guava Cache相比,Encache的功能更加丰富,扩展性更强,支持多种缓存淘汰算法,包括LRU、LFU和FIFO,缓存支持堆内存储、堆外存储、磁盘存储(支持持久化)三种,支持多种集群方案,解决数据共享问题。 | Encache在性能上不及Caffeine和Guava Cache | | Guava Cache | Guava Cache继承了ConcurrentHashMap的思路,使用多个segments方式的细粒度锁,在保证线程安全的同时,支持高并发场景需求。 | 无明显缺点,性能不及Caffeine | | Caffeine | Caffeine内部采用了一种结合LRU、LFU优点的算法:W-TinyLFU,在性能上比Guava cache更加优秀,Caffeine的API基本和Guava cache一样。 | 无明显缺点 |

综合考虑,使用 Caffeine

常见的多级缓存方案

image.png

角色

  1. pod
  2. worker
  3. 注册中心

    流程

  4. POD集群服务与worker集群启动时,将自身信息注册到注册中心;

  5. POD集群服务与worker集群直连;
  6. RedisClient客户端收集redis热key统计信息,上报给worker;
  7. worker对热key进行分析,如果触发阈值,通知所有POD,将热key进行本地缓存;
  8. 如果热key有修改,同样的方式通知所有服务修改;

    优点

    能解决热key问题,并能第一时间通知所有POD进行本地缓存;

    缺点

    需要额外引入worker集群与注册中心,复杂度飙升;

    Cache组件方案

    1、整体设计

    image.png

    优点

    1. 组件基于Redis的发布订阅实现分布式缓存的通知,降低整体复杂度;
    2. 组件内部集成本地缓存,使用更简单;

      缺点

    3. 将高可用完全寄托于Redis,如果redis异常,则依赖本地缓存的淘汰策略,因此超时时间不能设置太长;

    4. 使用者需要区分哪些数据结构的命令支持本地缓存,哪些不支持;

      2、组件详细设计

      image.png

      3、介绍

      命令生命周期

      image.png

    5. 通过CacheExecutor执行器调用命令,执行器通过handle调度器调用,handle调度器将命令通过本地缓存工厂 RedisLocalCacheFactory包装重写;

    6. 重写后的命令,优先判断key是否需要本地缓存,然后根据数据类型(目前只支持String、Hash、Set)选择本地缓存实现类,再根据本地缓存执行策略调用:

      1. NONE:直接调用命令的apply进行redis操作并返回
      2. GET:通过本地缓存执行器执行get操作,判断是否需要通知其他POD,需要则通过发布订阅通知;
      3. SET:通过本地缓存执行器执行set操作,并通知其他POD;

        4、具体代码

        工厂 RedisLocalCacheFactory ```java /**
      • @author JerryLong
      • @version V1.0
      • @Title: RedisLocalCacheFactory
      • @Description: 本地缓存工厂
      • @date 2021/8/24 5:20 PM / public class RedisLocalCacheFactory { /*
      • 支持本地缓存设置最大数量 / private static final int KEY_MAX_SIZE = 200; /*
      • 短暂的本地缓存key,只支持5.1秒 / private static final int LOCAL_TRANSIENCE_KEY_MILLISECONDS = 5100; /*
      • 需要长期本地缓存的key列表 / private static Map> longTimesKeys = new ConcurrentHashMap<>(); /*
      • 短暂本地缓存的key前缀 / private static final String TRANSIENCE_KEY_PREFIX = “hot-key-“; /*
      • 数值1,需要推送 / public static final int KEY_STATUS_NEED_PUBLISH = 1; /*
      • 数值2,已经推送 / public static final int KEY_STATUS_ALREADY_PUBLISH = 2; /*
      • 热key本地缓存 */ private static Cache transienceLocalKeys = Caffeine.newBuilder() //设置cache的初始大小为10,要合理设置该值 .initialCapacity(10) //最大值 .maximumSize(2000) //在写入后开始计时,在指定的时间后过期。 .expireAfterWrite(LOCAL_TRANSIENCE_KEY_MILLISECONDS, TimeUnit.MILLISECONDS) //构建cache实例 .build();

      private static Map localCacheHandleMap = new ConcurrentHashMap<>();

      private static AbstractLocalCacheHandle defaultHandle = new DefaultLocalCacheHandle();

      static { localCacheHandleMap.put(CommandsDataTypeEnum.STRING, new StringLocalCacheHandle()); localCacheHandleMap.put(CommandsDataTypeEnum.SET, new SetLocalCacheHandle()); localCacheHandleMap.put(CommandsDataTypeEnum.HASH, new HashLocalCacheHandle()); localCacheHandleMap.put(CommandsDataTypeEnum.LIST, new ListLocalCacheHandle()); localCacheHandleMap.put(CommandsDataTypeEnum.ZSET, new ZsetLocalCacheHandle()); }

      public static void registe(CommandsDataTypeEnum dataType, AbstractLocalCacheHandle handle) { localCacheHandleMap.put(dataType, handle); }

      /**

      • 本地缓存key发布订阅channel */ public static final String LOCAL_CACHE_KEY_PUBSUB_CHANNEL = “Lcache:local:channel:”;

      /**

      • 获取本地缓存执行器 *
      • @param dataType
      • @param executor
      • @return */ public static AbstractLocalCacheHandle getLocalCacheHandle(CommandsDataTypeEnum dataType, BaseCacheExecutor executor) { return localCacheHandleMap.getOrDefault(dataType, defaultHandle); }

      /**

      • 添加本地缓存的key *
      • @param key */ public static void addLocalCacheKey(BaseCacheExecutor executor, String key) { Set sets = longTimesKeys.computeIfAbsent(CacheConfigUtils.modelToHashKey(executor.getCacheConfigModel()), e -> new HashSet<>()); CacheExceptionFactory.throwException(sets.size() + 1 < KEY_MAX_SIZE, “本地缓存的key数量超出” + KEY_MAX_SIZE); sets.add(key); }
  1. /**
  2. * 添加本地缓存的key
  3. *
  4. * @param keys
  5. */
  6. public static void addLocalCacheKey(BaseCacheExecutor executor, Set<String> keys) {
  7. Set<String> sets = longTimesKeys.computeIfAbsent(CacheConfigUtils.modelToHashKey(executor.getCacheConfigModel()), e -> new HashSet<>());
  8. CacheExceptionFactory.throwException(sets.size() + keys.size() < KEY_MAX_SIZE, "本地缓存的key数量超出" + KEY_MAX_SIZE);
  9. sets.addAll(keys);
  10. }
  11. /**
  12. * 是否是本地缓存的key
  13. *
  14. * @param key
  15. * @return 1 需要推送,2 不需要推送
  16. */
  17. public static int isLocalCacheKey(BaseCacheExecutor executor, String key) {
  18. if (StringUtils.isBlank(key)) {
  19. return -1;
  20. }
  21. Set<String> sets = longTimesKeys.computeIfAbsent(CacheConfigUtils.modelToHashKey(executor.getCacheConfigModel()), e -> new HashSet<>());
  22. if (sets.contains(key)) {
  23. return KEY_STATUS_ALREADY_PUBLISH;
  24. }
  25. return (int) transienceLocalKeys.get(TRANSIENCE_KEY_PREFIX + key, e -> -1);
  26. }
  27. /**
  28. * 提交一个短暂本地缓存的key
  29. *
  30. * @param key
  31. * @param isNeedPublish 是否需要推送
  32. */
  33. public static void addTransienceLocalCacheKey(String key, boolean isNeedPublish) {
  34. transienceLocalKeys.put(TRANSIENCE_KEY_PREFIX + key, isNeedPublish ? KEY_STATUS_NEED_PUBLISH : KEY_STATUS_ALREADY_PUBLISH);
  35. }
  36. /**
  37. * 获取命令与key本地缓存执行策略
  38. *
  39. * @param commands
  40. * @param key
  41. * @return
  42. */
  43. public static RedisLocalCachePubHandle getRedisLocalCachePubHandle(BaseCacheExecutor executor, String commands, String key) {
  44. int isNeedPublish = isLocalCacheKey(executor, key);
  45. //不需要本地缓存的key,直接返回none
  46. if (isNeedPublish < 0) {
  47. return null;
  48. }
  49. return new RedisLocalCachePubHandle(isNeedPublish, CommandsDataTypeUtil.getHotKeyHandleType(commands));
  50. }
  51. /**
  52. * 本地缓存通知处理方式
  53. */
  54. public static class RedisLocalCachePubHandle {
  55. public RedisLocalCachePubHandle() {
  56. }
  57. public RedisLocalCachePubHandle(int isNeedPublish, LocalCacheHandleTypeEnum handleType) {
  58. this.isNeedPublish = isNeedPublish;
  59. this.handleType = handleType;
  60. }
  61. private int isNeedPublish;
  62. private LocalCacheHandleTypeEnum handleType;
  63. public int getIsNeedPublish() {
  64. return isNeedPublish;
  65. }
  66. public LocalCacheHandleTypeEnum getHandleType() {
  67. return handleType;
  68. }
  69. }

}

**本地缓存执行器抽象 AbstractLocalCacheHandle**
```java
/**
 * @author JerryLong
 * @version V1.0
 * @Title: AbstractLocalCacheHandle
 * @Description: 本地缓存执行器抽象类
 * @date 2021/11/9 2:34 PM
 */
public abstract class AbstractLocalCacheHandle {

    private static Cache<String, LocalCacheLifeCycle> LOCAL_CACHE = Caffeine.newBuilder()
            //设置cache的初始大小为10,要合理设置该值
            .initialCapacity(100)
            //最大值
            .maximumSize(10000)
            //在写入后开始计时,在指定的时间后过期。保留500毫秒
            .expireAfterWrite(500, TimeUnit.MILLISECONDS)
            //构建cache实例
            .build();

    /**
     * 获取,如果存在
     *
     * @param key
     * @return
     */
    public LocalCacheLifeCycle getIfPresent(BaseCacheExecutor executor, String key) {
        return LOCAL_CACHE.getIfPresent(getLocalKey(executor, key));
    }

    /**
     * 删除
     *
     * @param key
     */
    public static void del(String key) {
        LOCAL_CACHE.invalidate(key);
    }

    public static void del(BaseCacheExecutor executor, String key) {
        LOCAL_CACHE.invalidate(getLocalKey(executor.getCacheConfigModel(), key));
    }

    /**
     * 实现类需要指定自身数据类型
     *
     * @return
     */
    protected abstract CommandsDataTypeEnum getDataType();

    @PostConstruct
    public void regist() {
        RedisLocalCacheFactory.registe(getDataType(), this);
    }

    /**
     * 获取
     *
     * @param function
     * @param key
     * @param fields
     * @return
     */
    protected abstract Object get(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields);

    protected Object set(BaseCacheExecutor executor, CacheFunction function, String key) {
        try {
            return function.apply();
        } finally {
            //删除本地缓存
            LOCAL_CACHE.invalidate(key);
        }
    }

    /**
     * 缓存方法执行
     *
     * @param function
     * @param key
     * @param fields
     * @return
     */
    public Object doCacheFunc(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields) {
        if (executor.getCacheConfigModel().isLocalCache()) {
            //这一次命令是否不走本地缓存
            if (Boolean.TRUE.equals(executor.getNoLocalCacheOnce())) {
                try {
                    return function.apply();
                } finally {
                    executor.removeNoLocalCacheOnce();
                }
            }
            RedisLocalCacheFactory.RedisLocalCachePubHandle localCacheHandle = RedisLocalCacheFactory.getRedisLocalCachePubHandle(executor, function.fnToFnName(), key);
            if (null != localCacheHandle) {
                Object res = null;
                switch (localCacheHandle.getHandleType()) {
                    case NONE:
                        res = function.apply();
                        break;
                    case GET:
                        res = this.get(executor, function, key, fields);
                        //判断是否需要通知其他pod
                        if (localCacheHandle.getIsNeedPublish() == RedisLocalCacheFactory.KEY_STATUS_NEED_PUBLISH) {
                            //修改状态
                            RedisLocalCacheFactory.addTransienceLocalCacheKey(key, false);
                            LocalCachePublisher.publish(executor, key, true);
                        }
                        break;
                    case SET:
                        res = this.set(executor, function, getLocalKey(executor, key));
                        //通知其他pod,key是拼接后的
                        LocalCachePublisher.publish(executor, getLocalKey(executor, key), false);
                        break;
                    default:
                        res = function.apply();
                        break;
                }
                return res;
            }
        }
        return function.apply();
    }

    /**
     * 获取本地缓存的key
     *
     * @param key
     * @return
     */
    private String getLocalKey(BaseCacheExecutor executor, String key) {
        return getLocalKey(executor.getCacheConfigModel(), key);
    }

    protected static String getLocalKey(CacheConfigModel cacheConfigModel, String key) {
        return "l" + CacheConfigUtils.modelToHashKey(cacheConfigModel) + key;
    }

    /**
     * 获取cache
     *
     * @param key
     * @param check
     * @param loadData
     * @param <T>
     * @return
     */
    protected <T extends Object> Optional<LocalCacheLifeCycle<T>> getLocalCache(BaseCacheExecutor executor, String key, Function check, Function loadData) {
        key = getLocalKey(executor, key);
        @Nullable LocalCacheLifeCycle data = LOCAL_CACHE.getIfPresent(key);
        if (null == data) {
            loadAsync(key, check, loadData);
        }
        return Optional.ofNullable(data);
    }

    /**
     * 异步加载,如果在加载期间有修改,则放弃
     *
     * @param key
     */
    protected static void loadAsync(String key, Function checkLength, Function loadData) {
        CompletableFuture.runAsync(() -> {
            //判断是否能被加载
            LocalCacheLifeCycle cycle = LOCAL_CACHE.getIfPresent(key);
            if (!isCanLoad(cycle)) {
                return;
            }
            //判断长度
            if (null != checkLength) {
                if (!(boolean) checkLength.apply(null)) {
                    LOCAL_CACHE.put(key, new LocalCacheLifeCycle(LocalCacheStatus.CAN_NOT, null));
                    return;
                }
            }
            Object data = loadData.apply(null);
            //最后再判断一遍是否能被加载,并且判断版本号是否是最新的
            LocalCacheLifeCycle cycle1 = LOCAL_CACHE.getIfPresent(key);
            if (isCanLoad(cycle1)) {
                //判断版本号
                if (null != cycle && null != cycle1 && cycle1.getTimestamp() > cycle.getTimestamp()) {
                    return;
                }
                //缓存
                LOCAL_CACHE.put(key, new LocalCacheLifeCycle(LocalCacheStatus.CACHE, data));
            }
        });
    }

    /**
     * 是否可以被重新加载,
     * true : 初始化Null、已删除、已缓存
     *
     * @param cycle
     * @return
     */
    private static boolean isCanLoad(LocalCacheLifeCycle cycle) {
        return null == cycle || cycle.getLocalCacheStatus().equals(LocalCacheStatus.CACHE) || cycle.getLocalCacheStatus().equals(LocalCacheStatus.DEL);
    }

    /**
     * 本地缓存生命周期
     */
    public static class LocalCacheLifeCycle<T> {

        public LocalCacheLifeCycle() {
        }

        public LocalCacheLifeCycle(LocalCacheStatus localCacheStatus, T data) {
            this.localCacheStatus = localCacheStatus;
            this.timestamp = System.currentTimeMillis();
            this.data = data;
        }

        /**
         * 缓存状态
         */
        private LocalCacheStatus localCacheStatus;
        /**
         * 版本号,毫秒时间戳
         */
        private Long timestamp;
        /**
         * 缓存数据
         */
        private T data;

        public LocalCacheStatus getLocalCacheStatus() {
            return localCacheStatus;
        }

        public Long getTimestamp() {
            return timestamp;
        }

        public T getData() {
            return data;
        }
    }

    /**
     * key缓存状态
     */
    protected enum LocalCacheStatus {
        /**
         * 不能本地缓存
         */
        CAN_NOT,
        /**
         * 删除
         */
        DEL,
        /**
         * 加载中
         */
        LOADING,
        /**
         * 缓存中
         */
        CACHE;
    }
}

本地缓存热key发布 LocalCachePublisher

/**
 * @author JerryLong
 * @version V1.0
 * @Title: LocalCachePublisher
 * @Description: 热key消息发送方
 * @date 2021/8/24 7:06 PM
 */
public class LocalCachePublisher {

    /**
     * 通过pubsub通知其他节点
     *
     * @param executor
     * @param key
     * @param isNewKey true是新的key,false是修改的key
     */
    public static void publish(BaseCacheExecutor executor, String key, boolean isNewKey) {
        if (!MonitorFactory.isOpenHotKeyLocalCache()) {
            return;
        }
        if (null == executor) {
            return;
        }
        //通知其他pod
        executor.publishAsync(RedisLocalCacheFactory.LOCAL_CACHE_KEY_PUBSUB_CHANNEL + executor.getCacheConfigModel().getCacheType(), JSON.toJSONString(new HotKeySubscriptData(key, isNewKey ? -1 : 1)));
    }
}

本地缓存热key消费 LocalCacheSubscriber

/**
 * @author JerryLong
 * @version V1.0
 * @Title: LocalCacheSubscriber
 * @Description: 热key消息监听方
 * @date 2021/8/24 7:05 PM
 */
public class LocalCacheSubscriber {

    private static Map<String, Integer> subscribeInfo = new ConcurrentHashMap<>();

    /**
     * 增加监听
     *
     * @param executor
     */
    public static void addSubscriber(BaseCacheExecutor executor) {
        subscribeInfo.computeIfAbsent(CacheConfigUtils.modelToHashKeyNoUseType(executor.getCacheConfigModel()), e -> {
            executor.subscribe((message) -> manageMessage(JSON.parseObject(message, HotKeySubscriptData.class)), RedisLocalCacheFactory.LOCAL_CACHE_KEY_PUBSUB_CHANNEL + executor.getCacheConfigModel().getCacheType());
            return 1;
        });
    }

    /**
     * 管理消息
     *
     * @param hotKeySubscriptData
     */
    public static void manageMessage(HotKeySubscriptData hotKeySubscriptData) {
        if (!MonitorFactory.isOpenHotKeyLocalCache()) {
            return;
        }
        //如果是当前pod发的消息,不处理
        if (hotKeySubscriptData.isLocalHost()) {
            return;
        }
        if (hotKeySubscriptData.isNewKey()) {
            //新的key,本地缓存中注册这个key
            RedisLocalCacheFactory.addTransienceLocalCacheKey(hotKeySubscriptData.getKey(), false);
        } else {
            //修改,需要删除本地缓存
            AbstractLocalCacheHandle.del(hotKeySubscriptData.getKey());
        }
    }
}

本地缓存处理器实现(String、Hash、Set)

public class StringLocalCacheHandle extends AbstractLocalCacheHandle {

    @Override
    protected CommandsDataTypeEnum getDataType() {
        return CommandsDataTypeEnum.STRING;
    }

    @Override
    public Object get(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields) {
        return this.getLocalCache(executor, key, null, e -> executor.noLocalCacheOnce().get(key)).map(
                e -> {
                    //判断缓存状态
                    if (!e.getLocalCacheStatus().equals(LocalCacheStatus.CACHE)) {
                        return function.apply();
                    }
                    switch (function.fnToFnName()) {
                        case "get":
                            return e.getData();
                        case "substr":
                            if (null == e.getData()) {
                                return "";
                            }
                            return substr(e.getData().toString(), Integer.parseInt(fields[0].toString()), Integer.parseInt(fields[1].toString()));
                        case "strlen":
                            if (null == e.getData()) {
                                return 0L;
                            }
                            return Long.parseLong(String.valueOf(e.getData().toString().length()));
                        default:
                            return function.apply();
                    }
                }
        ).orElse(function.apply());
    }

    public static String substr(String v, int s, int e) {
        if (StringUtils.isBlank(v)) {
            return "";
        }
        return v.substring(s, javaEnd(v.length(), e));
    }

    public static int javaEnd(int length, int end) {
        if (0 < end) {
            int l = end + 1;
            return l > length ? length : l;
        } else if (0 == end) {
            return 1;
        } else {
            int l = length + 1 + end;
            return l < 1 ? 1 : l;
        }
    }
}

public class HashLocalCacheHandle extends AbstractLocalCacheHandle {

    @Override
    protected CommandsDataTypeEnum getDataType() {
        return CommandsDataTypeEnum.HASH;
    }

    private static final Integer HASH_MAX_LEN = 5000;

    /**
     * 如果key不存在,需要先load
     *
     * @param function
     * @param fields
     * @return
     */
    @Override
    public Object get(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields) {
        return this.getCacheMap(executor, key).map(e -> {
            //判断缓存状态
            if (!e.getLocalCacheStatus().equals(LocalCacheStatus.CACHE)) {
                return function.apply();
            }
            switch (function.fnToFnName()) {
                case "hget":
                    return e.getData().get(fields[0]);
                case "hmget":
                    List<Object> list = new ArrayList<>();
                    for (int i = 0; i < fields.length; i++) {
                        list.add(e.getData().get(fields[i]));
                    }
                    return list;
                case "hmgetToMap":
                    Map<String, Object> map = new HashMap<>(8);
                    Object o = null;
                    for (int i = 0; i < fields.length; i++) {
                        o = e.getData().get(fields[i]);
                        if (null != o) {
                            map.put(fields[i].toString(), o);
                        }
                    }
                    return map;
                case "hmgetToMapCanNull":
                    Map<String, Object> map1 = new HashMap<>(8);
                    for (int i = 0; i < fields.length; i++) {
                        map1.put(fields[i].toString(), e.getData().get(fields[i]));
                    }
                    return map1;
                case "hgetAll":
                    return e.getData();
                case "hkeys":
                    return e.getData().keySet();
                case "hlen":
                    return Long.valueOf(e.getData().size());
                case "hvals":
                    return new ArrayList<>(e.getData().values());
                case "hexists":
                    return e.getData().containsKey(fields[0]);
                case "hstrlen":
                    return e.getData().get(fields[0]).toString().length();
                default:
                    return function.apply();
            }
        }).orElse(function.apply());
    }

    /**
     * 获取cache
     *
     * @param key
     * @return
     */
    private Optional<LocalCacheLifeCycle<Map<String, Object>>> getCacheMap(BaseCacheExecutor executor, String key) {
        return getLocalCache(executor, key, e -> executor.noLocalCacheOnce().hlen(key) <= HASH_MAX_LEN, e -> executor.noLocalCacheOnce().hgetAll(key));
    }
}

public class SetLocalCacheHandle extends AbstractLocalCacheHandle {

    @Override
    protected CommandsDataTypeEnum getDataType() {
        return CommandsDataTypeEnum.SET;
    }

    private static final Integer SET_MAX_LEN = 5000;

    /**
     * 如果key不存在,需要先load
     *
     * @param function
     * @param fields
     * @return
     */
    @Override
    public Object get(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields) {
        return this.getCacheMap(executor, key).map(e -> {
            //判断缓存状态
            if (!e.getLocalCacheStatus().equals(LocalCacheStatus.CACHE)) {
                return function.apply();
            }
            switch (function.fnToFnName()) {
                case "sismember":
                    return e.getData().contains(fields[0]);
                case "smembers":
                    return e.getData();
                case "scard":
                    return Long.valueOf(e.getData().size());
                default:
                    return function.apply();
            }
        }).orElse(function.apply());
    }

    /**
     * 获取cache
     *
     * @param key
     * @return
     */
    private Optional<LocalCacheLifeCycle<Set<Object>>> getCacheMap(BaseCacheExecutor executor, String key) {
        return getLocalCache(executor, key, e -> executor.noLocalCacheOnce().scard(key) <= SET_MAX_LEN, e -> executor.noLocalCacheOnce().smembers(key));
    }
}