执行热key是什么

redis中访问最多的key列表,称之为热key。
当到达一定量级时,需要及时解决避免出现更严重的问题。

热key的产生

热点问题产生的原因大致有以下两种:

  1. 请求过于集中,单个key没有进行分片存储在一个物理机上,大量请求集中在这一个物理机;
  2. 如热卖商品、热点新闻、热点数据等导致流量突增;

    热key的危害

  3. 热key过期时,大量的请求穿透到DB。

    1. //组件提供通用方法,通过Redisson分布式锁与DoubleCheck方式实现防穿透
    2. baseCacheExecutor.getCacheData(
    3. new CacheDataBuilder(
    4. //加锁的key,默认等待10秒,执行超时30秒,锁等待失败会尝试从缓存中再拿一次
    5. "LOCK.KEY",
    6. //从缓存中获取应用信息
    7. e -> baseCacheExecutor.get("xxx"),
    8. //如果没拿到,从DB中获取
    9. e -> DB.DATA,
    10. //如果缓存为空,将DB中拿到的数据缓存
    11. dbData -> baseCacheExecutor.save(dbData)
    12. )
    13. );
  4. 请求流量过于集中,达到单节点QPS上限,导致redis集群整体不可用,从而导致缓存穿透、DB不可用。该问题的挑战在于它无法通过增加机器容量来解决。

    热key的收集

    一、开发人员根据业务经验评估哪些是热key

    如活动类、商品秒杀类等热点数据,在上线前就可以判断出是热key。可以提前预热将缓存数据存储到多台redis物理机,或者进行多级缓存。缺点就是对开发人员要求较高,并且不能保证每次都能预估。

    二、用redis自带命令

  • monitor命令,可以实时抓取redis服务器接收到的命令,通过脚本统计出热key。有开源工具可通过此命令分析,如:redis-faina。缺点:高并发下会导致内存暴增并损耗性能,因此不可取。
  • redis4.0.3版本提供的热key统计功能,通过修改redis配置中的淘汰策略,指定为LFU,redis使用此淘汰策略时同时记录了key的访问频次,通过redis-cli —hotkeys可以输出最高频次的16个key,详细介绍:https://developer.aliyun.com/article/278922

    三、在Proxy层做收集

    如下图所示,阿里云redis实例中有一层proxy,可以在Proxy层做收集上报,但是缺点很明显,并非所有的redis集群架构都有proxy,并且阿里云redis不会开放这样的接口。
    image.png

    四、在客户端进行收集

  • 通过kafka等方式上报redis执行的命令,汇总到统一worker进行计算。缺点:对worker的压力过大,增加了额外的复杂度。

  • 并不是所有命令都需要上报计算,因此可以通过redis客户端进行命令的收集,先在本地计算,再对结果收集。

    Cache组件的实现

    Cache组件采用上述第四个方案中的第二种实现。
    image.png
    上图为cache组件整体执行流程
  1. Cache组件执行器通过Monitor模块中的MonitorProducer生产命令的监控数据,并将监控数据推入MonitorDataQueue阻塞队列。
  2. MonitorConsumer消费MonitorDataQueue阻塞队列中的监控数据,并分发到Monitor实现类CounterTimerHotKey
  3. CounterTimer通过tag进行各维度的统计,并每分钟推送到MonitorDB(InfluxDb)
  4. HotKey通过HotKeyStatistics进行热key统计计算,并定时输出热key log文件
  5. 运维通过消费热keyLog文件,收集汇总到MonitorDB

    热key计算实现代码

    生产监控数据

    1. //生产监控消息
    2. MonitorProducer.addCommands(
    3. MonitorFactory.MonitorData.builder()
    4. .cacheType(cacheHandleProcessorModel.getCacheConfigModel().getCacheType())
    5. .commands(cacheHandleProcessorModel.getCommands())
    6. .key(cacheHandleProcessorModel.getKey())
    7. .keys(cacheHandleProcessorModel.getKeys())
    8. .result(null != cacheHandleProcessorModel.getE() ? false : true)
    9. .executeTime(cacheHandleProcessorModel.getExecuteTime().intValue())
    10. );
    11. /**
    12. * 监控实体信息
    13. */
    14. MonitorData {
    15. //redis连接类型
    16. private String cacheType;
    17. //redis操作命令
    18. private String commands;
    19. //操作的key
    20. private String key;
    21. //操作的key列表
    22. private List<String> keys;
    23. //命令执行时间
    24. private int executeTime;
    25. //命令执行结果
    26. private boolean result;
    27. }

    消费监控数据

    ```java /**

    • 监控接口 */ public interface Monitor {

      /**

      • 执行监控 *
      • @param monitorData 监控实体
      • @return */ Object doMonitor(MonitorFactory.MonitorData monitorData); }

//消费线程池 private static ThreadPoolExecutor consumeThreadPool = new ThreadPoolExecutor(1, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new DefaultThreadFactory(“cache-monitor-consume”));

//执行消费线程 consumeThreadPool.execute(new ConsumeRunnable(-1)); //增加消费队列监控,如果队列长度超出,请检查消费逻辑 AsyncExecutorUtils.submitScheduledTask(() -> { //如果队列长度超出,增加error日志 if (MonitorFactory.MONITOR_QUEUE.size() > MonitorFactory.QUEUE_MONITOR_SIZE) { CacheExceptionFactory.addWarnLog(“monitor 消费阻塞警告!剩余消费数值超过 “ + MonitorFactory.QUEUE_MONITOR_SIZE); //增加一个30秒消费线程 /consumeThreadPool.execute(new ConsumeRunnable(30000L));/ } }, 5, 5, TimeUnit.SECONDS);

/**

  • 消费任务 */ static class ConsumeRunnable implements Runnable {

    private long executeTime = -1;

    public ConsumeRunnable() { }

    //大于0则有结束时间,单位:毫秒 public ConsumeRunnable(long executeTime) {

    1. if (executeTime > 0) {
    2. this.executeTime = System.currentTimeMillis() + executeTime;
    3. }

    }

    @Override public void run() {

     while (executeTime < 0 || executeTime > System.currentTimeMillis()) {
         //消费监听
         try {
             MonitorFactory.MonitorData take = MonitorFactory.MONITOR_QUEUE.take();
             if (null != take) {
                 //通过监控模块工厂获取监控实现
                 MonitorFactory.MONITOR_MAP.entrySet().forEach(e -> {
                     try {
                         e.getValue().doMonitor(take);
                     } catch (Exception e1) {
                         CacheExceptionFactory.addWarnLog("MonitorProducer comsume error ! monitorType:{}", e1, e.getKey().name());
                     }
                 });
             }
         } catch (InterruptedException e) {
             CacheExceptionFactory.addWarnLog("MonitorProducer comsume error !", e);
         }
     }
    

    } } ```

    监控模块扩展HotKeyMonitorConsumer实现

    ```java /**

  • 热key监控消费实现,继承了Monitor接口 */ public class CacheHotKeyMonitorConsumer extends AbstractCacheMonitorConsumer { @Autowired private static HotKeyMonitor hotKeyMonitor;

    @Override public Object doMonitor(MonitorFactory.MonitorData monitorData) {

     if (monitorData.getResult()) {
         if (StringUtils.isNotBlank(monitorData.getKey())) {
             hotKeyMonitor.doMonitor(monitorData.getCacheType(), monitorData.getCommands(), monitorData.getKey());
         } else if (CollectionUtils.isNotEmpty(monitorData.getKeys())) {
             monitorData.getKeys().stream().forEach(e -> hotKeyMonitor.doMonitor(monitorData.getCacheType(), monitorData.getCommands(), e));
         }
     }
     return null;
    

    } } ```

    此时已拿到监控数据,开始热key计算

    热key计算有2种实现
    1、使用优先级队列PriorityQueue

  • 优点:计算速度快,本机压测10秒计算7.5KW,约为跳表方式的3倍
  • 缺点:内存要求比跳表方式稍高

2、使用跳表实现的ConcurrentSkipListMap

  • 思考:跳表key为score,value为member,如何反转并通过member排序?
  • 优点:内存要求低
  • 缺点:计算速度不如PriorityQueue,但本机压测10秒计算2.5KW,足以支撑单节点redis命令; ```java /**

    • 热key计算抽象,封装了key自增与热key操作,定义计算实现抽象方法 / public abstract class AbstractHotKeyStatistics implements HotKeyStatistics { /*

      • 热key统计容量大小 */ protected static final int STATISTIC_CAPACITY = CacheBasicConfig.HOTE_KEY_STATISTIC_CAPACITY;

      /**

      • 热key统计起始值 */ protected static final int HOT_KEY_COUNT_LEAST_VALUE = CacheBasicConfig.HOT_KEY_COUNT_LEAST_VALUE;

      /**

      • 读写锁 */ protected StampedLock stampedLock = new StampedLock();

      /**

      • key自增 *
      • @param commands
      • @param key */ protected abstract void incr(String commands, String key);

      /**

      • 获取热key列表 *
      • @return */ protected abstract List getHotkeys();

      /**

      • 清空 */ protected abstract void clean();

      /**

      • 获取热key *
      • @return */ public List getHotKeysAndClean() { long writeLock = stampedLock.writeLock(); try {
         List<HotKeyItem> hotkeys = this.getHotkeys();
         if (CollectionUtils.isNotEmpty(hotkeys)) {
             this.clean();
         }
         return hotkeys;
        
        } finally {
         stampedLock.unlockWrite(writeLock);
        
        } }

      /**

      • key自增 *
      • @param commands
      • @param key */ public void hotKeyIncr(String commands, String key) { long readLock = stampedLock.readLock(); try {
         this.incr(commands, key);
        
        } finally {
         stampedLock.unlockRead(readLock);
        
        } } }

/**

  • 热key实体 */ public class HotKeyItem implements Serializable {

    /* / private static final long serialVersionUID = 1L;

    public HotKeyItem(String commands, String key, LongAdder count) {

     this.commands = commands;
     this.key = key;
     this.count = count;
    

    }

    private String commands;

    private String key;

    private LongAdder count;

    @Override public boolean equals(Object o) {

     if (this == o) {
         return true;
     }
     if (o == null || getClass() != o.getClass()) {
         return false;
     }
     HotKeyItem that = (HotKeyItem) o;
     return Objects.equals(key, that.key);
    

    }

    @Override public int hashCode() {

     return Objects.hash(key);
    

    }

    public void setKey(String key) {

     this.key = key;
    

    }

    public void setCount(LongAdder count) {

     this.count = count;
    

    }

    public String getKey() {

     return key;
    

    }

    public LongAdder getCount() {

     return count;
    

    }

    public String getCommands() {

     return commands;
    

    }

    public void setCommands(String commands) {

     this.commands = commands;
    

    }

    /**

    • 防止expire等命令导致数据类型错乱 */ public void calibrationCommands(String commands) { if (!commands.equals(this.commands)) {

       if (!CommandsDataTypeEnum.getCommandsDataType(commands).equals(CommandsDataTypeEnum.STRING)) {
           this.commands = commands;
       }
      

      } }

      @Override public String toString() { return “HotKeyItem{“ +

           "commands='" + commands + '\'' +
           ", key='" + key + '\'' +
           ", count=" + count +
           '}';
      

      } }

/**

  • 优先队列方式实现 / public class PriorityQueueStatistics extends AbstractHotKeyStatistics { /*

    • 数量限制,只保留指定数量的元素
    • 最小堆,即频率最低的在队头,每次将新元素与堆头的比较,留下其中较大者 */ private PriorityQueue hotKeyQueue = new PriorityQueue<>(STATISTIC_CAPACITY, (e1, e2) -> e1.getCount().intValue() > e2.getCount().intValue() ? 1 : 0);

      private Set existedSet = new HashSet<>();

      private ConcurrentHashMap hotkeyMap = new ConcurrentHashMap<>();

      private ReentrantLock incrLock = new ReentrantLock();

      @Override public List getHotkeys() { return Arrays.asList(hotKeyQueue.toArray(new HotKeyItem[]{})); }

      @Override public void clean() { hotkeyMap.clear(); existedSet.clear(); hotKeyQueue.clear(); }

      @Override public void incr(String commands, String key) { HotKeyItem item = hotkeyMap.computeIfAbsent(key,e->new HotKeyItem(commands, key, new LongAdder())); incrLock.lock(); try {

       //自增
       item.calibrationCommands(commands);
       item.getCount().increment();
       //如果未达到起始值,不计入热key
       if (item.getCount().intValue() < HOT_KEY_COUNT_LEAST_VALUE) {
           return;
       }
       //如果队列中存在对象
       if (existedSet.contains(key)) {
           hotKeyQueue.remove(item);
           hotKeyQueue.offer(item);
       } else {
           //如果队列长度不足
           if (hotKeyQueue.size() < STATISTIC_CAPACITY) {
               hotKeyQueue.remove(item);
               hotKeyQueue.offer(item);
               existedSet.add(key);
               return;
           }
           //弹出首位最小的进行对比,如果大于则插入
           HotKeyItem head = hotKeyQueue.peek();
           if (item.getCount().intValue() > head.getCount().intValue()) {
               hotKeyQueue.poll();
               hotKeyQueue.offer(item);
               existedSet.remove(head.getKey());
               existedSet.add(key);
           }
       }
      

      } finally {

       incrLock.unlock();
      

      } } }

/**

  • 跳表方式实现 */ public class SkipMapStatistics extends AbstractHotKeyStatistics {

    private Map map = new ConcurrentHashMap<>();

    private ConcurrentSkipListMap hotkeyMap =

         new ConcurrentSkipListMap<>((e1, e2) -> {
             if (e1.getKey().equals(e2.getKey())) {
                 return 0;
             }
             return e1.getCount().intValue() == e2.getCount().intValue() ? e1.getKey().compareTo(e2.getKey()) : e1.getCount().intValue() > e2.getCount().intValue() ? 1 : -1;
         });
    

    private ReentrantLock incrLock = new ReentrantLock();

    @Override public List getHotkeys() {

     return hotkeyMap.keySet().stream().collect(Collectors.toList());
    

    }

    @Override public void clean() {

     map.clear();
     hotkeyMap.clear();
    

    }

    @Override public void incr(String commands, String key) {

     HotKeyItem item = map.computeIfAbsent(key, e -> new HotKeyItem(commands, key, new LongAdder()));
     incrLock.lock();
     try {
         //必须先删除
         hotkeyMap.remove(item);
         item.calibrationCommands(commands);
         item.getCount().increment();
         //如果未达到起始值,不计入热key
         if (item.getCount().intValue() < HOT_KEY_COUNT_LEAST_VALUE) {
             return;
         }
    
         hotkeyMap.put(item, key);
         //如果队列长度不足STATISTIC_CAPACITY
         if (hotkeyMap.size() <= STATISTIC_CAPACITY) {
             return;
         }
         //弹出首位最小的进行对比,如果大于则插入
         Map.Entry<HotKeyItem, String> first = hotkeyMap.firstEntry();
         if (item.getCount().intValue() > first.getKey().getCount().intValue()) {
             hotkeyMap.pollFirstEntry();
             hotkeyMap.put(item, key);
         }
     } finally {
         incrLock.unlock();
     }
    

    } } ```

    附跳表与优先级队列的性能对比

    ```java public static void main(String[] args) { ConcurrentSkipListMap map = new ConcurrentSkipListMap(); PriorityQueue priorityQueue = new PriorityQueue(); LongAdder longAdder1 = new LongAdder(); LongAdder longAdder2 = new LongAdder();

    new Thread(()->{

     while (true){
         longAdder1.increment();
         map.put(ThreadLocalRandom.current().nextInt(0,100),Thread.currentThread().getName() + ThreadLocalRandom.current().nextInt(0,100));
         map.firstEntry();
         map.pollFirstEntry();
     }
    

    }).start(); new Thread(()->{

     while (true){
         longAdder2.increment();
         String s = Thread.currentThread().getName() + ThreadLocalRandom.current().nextInt(0, 100);
         priorityQueue.offer(ThreadLocalRandom.current().nextInt(0,100));
         priorityQueue.peek();
         priorityQueue.poll();
     }
    

    }).start(); try {

     Thread.sleep(10000L);
     System.out.println("跳表写读:" + longAdder1.intValue());
     System.out.println("优先级队列写读:" + longAdder2.intValue());
    

    } catch (InterruptedException e) {

     e.printStackTrace();
    

    } }

//结果 跳表写读:101227515 优先级队列写读:248118910 ```

热key收集

思路:

  1. 可以像玩吧Monitor组件处理的方式一样,通过Netty长连接将热key数据发送到worker服务。
  2. 玩吧通过Log4j,使用配置方式将热key数据写入文件。运维通过消费文件内容存储到时序数据库,最终呈现到Grafana监控报表上。

    热key的处理

    通过热key收集,已能监控到热key,可以考虑实现自动化防御。
    1、对key进行分片。
    2、通知pod进行本地缓存。