执行热key是什么
redis中访问最多的key列表,称之为热key。
当到达一定量级时,需要及时解决避免出现更严重的问题。
热key的产生
热点问题产生的原因大致有以下两种:
- 请求过于集中,单个key没有进行分片存储在一个物理机上,大量请求集中在这一个物理机;
-
热key的危害
热key过期时,大量的请求穿透到DB。
//组件提供通用方法,通过Redisson分布式锁与DoubleCheck方式实现防穿透
baseCacheExecutor.getCacheData(
new CacheDataBuilder(
//加锁的key,默认等待10秒,执行超时30秒,锁等待失败会尝试从缓存中再拿一次
"LOCK.KEY",
//从缓存中获取应用信息
e -> baseCacheExecutor.get("xxx"),
//如果没拿到,从DB中获取
e -> DB.DATA,
//如果缓存为空,将DB中拿到的数据缓存
dbData -> baseCacheExecutor.save(dbData)
)
);
请求流量过于集中,达到单节点QPS上限,导致redis集群整体不可用,从而导致缓存穿透、DB不可用。该问题的挑战在于它无法通过增加机器容量来解决。
热key的收集
一、开发人员根据业务经验评估哪些是热key
如活动类、商品秒杀类等热点数据,在上线前就可以判断出是热key。可以提前预热将缓存数据存储到多台redis物理机,或者进行多级缓存。缺点就是对开发人员要求较高,并且不能保证每次都能预估。
二、用redis自带命令
- monitor命令,可以实时抓取redis服务器接收到的命令,通过脚本统计出热key。有开源工具可通过此命令分析,如:redis-faina。缺点:高并发下会导致内存暴增并损耗性能,因此不可取。
redis4.0.3版本提供的热key统计功能,通过修改redis配置中的淘汰策略,指定为LFU,redis使用此淘汰策略时同时记录了key的访问频次,通过redis-cli —hotkeys可以输出最高频次的16个key,详细介绍:https://developer.aliyun.com/article/278922
三、在Proxy层做收集
如下图所示,阿里云redis实例中有一层proxy,可以在Proxy层做收集上报,但是缺点很明显,并非所有的redis集群架构都有proxy,并且阿里云redis不会开放这样的接口。
四、在客户端进行收集
通过kafka等方式上报redis执行的命令,汇总到统一worker进行计算。缺点:对worker的压力过大,增加了额外的复杂度。
- 并不是所有命令都需要上报计算,因此可以通过redis客户端进行命令的收集,先在本地计算,再对结果收集。
Cache组件的实现
Cache组件采用上述第四个方案中的第二种实现。
上图为cache组件整体执行流程
- Cache组件执行器通过Monitor模块中的MonitorProducer生产命令的监控数据,并将监控数据推入MonitorDataQueue阻塞队列。
- MonitorConsumer消费MonitorDataQueue阻塞队列中的监控数据,并分发到Monitor实现类Counter、Timer、HotKey
- Counter与Timer通过tag进行各维度的统计,并每分钟推送到MonitorDB(InfluxDb)
- HotKey通过HotKeyStatistics进行热key统计计算,并定时输出热key log文件
运维通过消费热keyLog文件,收集汇总到MonitorDB
热key计算实现代码
生产监控数据
//生产监控消息
MonitorProducer.addCommands(
MonitorFactory.MonitorData.builder()
.cacheType(cacheHandleProcessorModel.getCacheConfigModel().getCacheType())
.commands(cacheHandleProcessorModel.getCommands())
.key(cacheHandleProcessorModel.getKey())
.keys(cacheHandleProcessorModel.getKeys())
.result(null != cacheHandleProcessorModel.getE() ? false : true)
.executeTime(cacheHandleProcessorModel.getExecuteTime().intValue())
);
/**
* 监控实体信息
*/
MonitorData {
//redis连接类型
private String cacheType;
//redis操作命令
private String commands;
//操作的key
private String key;
//操作的key列表
private List<String> keys;
//命令执行时间
private int executeTime;
//命令执行结果
private boolean result;
}
消费监控数据
```java /**
监控接口 */ public interface Monitor {
/**
- 执行监控 *
- @param monitorData 监控实体
- @return */ Object doMonitor(MonitorFactory.MonitorData monitorData); }
//消费线程池 private static ThreadPoolExecutor consumeThreadPool = new ThreadPoolExecutor(1, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new DefaultThreadFactory(“cache-monitor-consume”));
//执行消费线程 consumeThreadPool.execute(new ConsumeRunnable(-1)); //增加消费队列监控,如果队列长度超出,请检查消费逻辑 AsyncExecutorUtils.submitScheduledTask(() -> { //如果队列长度超出,增加error日志 if (MonitorFactory.MONITOR_QUEUE.size() > MonitorFactory.QUEUE_MONITOR_SIZE) { CacheExceptionFactory.addWarnLog(“monitor 消费阻塞警告!剩余消费数值超过 “ + MonitorFactory.QUEUE_MONITOR_SIZE); //增加一个30秒消费线程 /consumeThreadPool.execute(new ConsumeRunnable(30000L));/ } }, 5, 5, TimeUnit.SECONDS);
/**
消费任务 */ static class ConsumeRunnable implements Runnable {
private long executeTime = -1;
public ConsumeRunnable() { }
//大于0则有结束时间,单位:毫秒 public ConsumeRunnable(long executeTime) {
if (executeTime > 0) {
this.executeTime = System.currentTimeMillis() + executeTime;
}
}
@Override public void run() {
while (executeTime < 0 || executeTime > System.currentTimeMillis()) { //消费监听 try { MonitorFactory.MonitorData take = MonitorFactory.MONITOR_QUEUE.take(); if (null != take) { //通过监控模块工厂获取监控实现 MonitorFactory.MONITOR_MAP.entrySet().forEach(e -> { try { e.getValue().doMonitor(take); } catch (Exception e1) { CacheExceptionFactory.addWarnLog("MonitorProducer comsume error ! monitorType:{}", e1, e.getKey().name()); } }); } } catch (InterruptedException e) { CacheExceptionFactory.addWarnLog("MonitorProducer comsume error !", e); } }
监控模块扩展HotKeyMonitorConsumer实现
```java /**
热key监控消费实现,继承了Monitor接口 */ public class CacheHotKeyMonitorConsumer extends AbstractCacheMonitorConsumer { @Autowired private static HotKeyMonitor hotKeyMonitor;
@Override public Object doMonitor(MonitorFactory.MonitorData monitorData) {
if (monitorData.getResult()) { if (StringUtils.isNotBlank(monitorData.getKey())) { hotKeyMonitor.doMonitor(monitorData.getCacheType(), monitorData.getCommands(), monitorData.getKey()); } else if (CollectionUtils.isNotEmpty(monitorData.getKeys())) { monitorData.getKeys().stream().forEach(e -> hotKeyMonitor.doMonitor(monitorData.getCacheType(), monitorData.getCommands(), e)); } } return null;
此时已拿到监控数据,开始热key计算
热key计算有2种实现
1、使用优先级队列PriorityQueue
- 优点:计算速度快,本机压测10秒计算7.5KW,约为跳表方式的3倍
- 缺点:内存要求比跳表方式稍高
2、使用跳表实现的ConcurrentSkipListMap
- 思考:跳表key为score,value为member,如何反转并通过member排序?
- 优点:内存要求低
缺点:计算速度不如PriorityQueue,但本机压测10秒计算2.5KW,足以支撑单节点redis命令; ```java /**
热key计算抽象,封装了key自增与热key操作,定义计算实现抽象方法 / public abstract class AbstractHotKeyStatistics implements HotKeyStatistics { /*
- 热key统计容量大小 */ protected static final int STATISTIC_CAPACITY = CacheBasicConfig.HOTE_KEY_STATISTIC_CAPACITY;
/**
- 热key统计起始值 */ protected static final int HOT_KEY_COUNT_LEAST_VALUE = CacheBasicConfig.HOT_KEY_COUNT_LEAST_VALUE;
/**
- 读写锁 */ protected StampedLock stampedLock = new StampedLock();
/**
- key自增 *
- @param commands
- @param key */ protected abstract void incr(String commands, String key);
/**
- 获取热key列表 *
- @return
*/
protected abstract List
getHotkeys();
/**
- 清空 */ protected abstract void clean();
/**
- 获取热key *
- @return
*/
public List
getHotKeysAndClean() { long writeLock = stampedLock.writeLock(); try {
} finally {List<HotKeyItem> hotkeys = this.getHotkeys(); if (CollectionUtils.isNotEmpty(hotkeys)) { this.clean(); } return hotkeys;
} }stampedLock.unlockWrite(writeLock);
/**
- key自增 *
- @param commands
- @param key
*/
public void hotKeyIncr(String commands, String key) {
long readLock = stampedLock.readLock();
try {
} finally {this.incr(commands, key);
} } }stampedLock.unlockRead(readLock);
/**
热key实体 */ public class HotKeyItem implements Serializable {
/* / private static final long serialVersionUID = 1L;
public HotKeyItem(String commands, String key, LongAdder count) {
this.commands = commands; this.key = key; this.count = count;
}
private String commands;
private String key;
private LongAdder count;
@Override public boolean equals(Object o) {
if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } HotKeyItem that = (HotKeyItem) o; return Objects.equals(key, that.key);
}
@Override public int hashCode() {
return Objects.hash(key);
}
public void setKey(String key) {
this.key = key;
}
public void setCount(LongAdder count) {
this.count = count;
}
public String getKey() {
return key;
}
public LongAdder getCount() {
return count;
}
public String getCommands() {
return commands;
}
public void setCommands(String commands) {
this.commands = commands;
}
/**
防止expire等命令导致数据类型错乱 */ public void calibrationCommands(String commands) { if (!commands.equals(this.commands)) {
if (!CommandsDataTypeEnum.getCommandsDataType(commands).equals(CommandsDataTypeEnum.STRING)) { this.commands = commands; }
} }
@Override public String toString() { return “HotKeyItem{“ +
"commands='" + commands + '\'' + ", key='" + key + '\'' + ", count=" + count + '}';
} }
/**
优先队列方式实现 / public class PriorityQueueStatistics extends AbstractHotKeyStatistics { /*
- 数量限制,只保留指定数量的元素
最小堆,即频率最低的在队头,每次将新元素与堆头的比较,留下其中较大者 */ private PriorityQueue
hotKeyQueue = new PriorityQueue<>(STATISTIC_CAPACITY, (e1, e2) -> e1.getCount().intValue() > e2.getCount().intValue() ? 1 : 0); private Set
existedSet = new HashSet<>(); private ConcurrentHashMap
hotkeyMap = new ConcurrentHashMap<>(); private ReentrantLock incrLock = new ReentrantLock();
@Override public List
getHotkeys() { return Arrays.asList(hotKeyQueue.toArray(new HotKeyItem[]{})); } @Override public void clean() { hotkeyMap.clear(); existedSet.clear(); hotKeyQueue.clear(); }
@Override public void incr(String commands, String key) { HotKeyItem item = hotkeyMap.computeIfAbsent(key,e->new HotKeyItem(commands, key, new LongAdder())); incrLock.lock(); try {
//自增 item.calibrationCommands(commands); item.getCount().increment(); //如果未达到起始值,不计入热key if (item.getCount().intValue() < HOT_KEY_COUNT_LEAST_VALUE) { return; } //如果队列中存在对象 if (existedSet.contains(key)) { hotKeyQueue.remove(item); hotKeyQueue.offer(item); } else { //如果队列长度不足 if (hotKeyQueue.size() < STATISTIC_CAPACITY) { hotKeyQueue.remove(item); hotKeyQueue.offer(item); existedSet.add(key); return; } //弹出首位最小的进行对比,如果大于则插入 HotKeyItem head = hotKeyQueue.peek(); if (item.getCount().intValue() > head.getCount().intValue()) { hotKeyQueue.poll(); hotKeyQueue.offer(item); existedSet.remove(head.getKey()); existedSet.add(key); } }
} finally {
incrLock.unlock();
} } }
/**
跳表方式实现 */ public class SkipMapStatistics extends AbstractHotKeyStatistics {
private Map
map = new ConcurrentHashMap<>(); private ConcurrentSkipListMap
hotkeyMap = new ConcurrentSkipListMap<>((e1, e2) -> { if (e1.getKey().equals(e2.getKey())) { return 0; } return e1.getCount().intValue() == e2.getCount().intValue() ? e1.getKey().compareTo(e2.getKey()) : e1.getCount().intValue() > e2.getCount().intValue() ? 1 : -1; });
private ReentrantLock incrLock = new ReentrantLock();
@Override public List
getHotkeys() { return hotkeyMap.keySet().stream().collect(Collectors.toList());
}
@Override public void clean() {
map.clear(); hotkeyMap.clear();
}
@Override public void incr(String commands, String key) {
HotKeyItem item = map.computeIfAbsent(key, e -> new HotKeyItem(commands, key, new LongAdder())); incrLock.lock(); try { //必须先删除 hotkeyMap.remove(item); item.calibrationCommands(commands); item.getCount().increment(); //如果未达到起始值,不计入热key if (item.getCount().intValue() < HOT_KEY_COUNT_LEAST_VALUE) { return; } hotkeyMap.put(item, key); //如果队列长度不足STATISTIC_CAPACITY if (hotkeyMap.size() <= STATISTIC_CAPACITY) { return; } //弹出首位最小的进行对比,如果大于则插入 Map.Entry<HotKeyItem, String> first = hotkeyMap.firstEntry(); if (item.getCount().intValue() > first.getKey().getCount().intValue()) { hotkeyMap.pollFirstEntry(); hotkeyMap.put(item, key); } } finally { incrLock.unlock(); }
附跳表与优先级队列的性能对比
```java public static void main(String[] args) { ConcurrentSkipListMap
map = new ConcurrentSkipListMap(); PriorityQueue priorityQueue = new PriorityQueue(); LongAdder longAdder1 = new LongAdder(); LongAdder longAdder2 = new LongAdder(); new Thread(()->{
while (true){ longAdder1.increment(); map.put(ThreadLocalRandom.current().nextInt(0,100),Thread.currentThread().getName() + ThreadLocalRandom.current().nextInt(0,100)); map.firstEntry(); map.pollFirstEntry(); }
}).start(); new Thread(()->{
while (true){ longAdder2.increment(); String s = Thread.currentThread().getName() + ThreadLocalRandom.current().nextInt(0, 100); priorityQueue.offer(ThreadLocalRandom.current().nextInt(0,100)); priorityQueue.peek(); priorityQueue.poll(); }
}).start(); try {
Thread.sleep(10000L); System.out.println("跳表写读:" + longAdder1.intValue()); System.out.println("优先级队列写读:" + longAdder2.intValue());
} catch (InterruptedException e) {
e.printStackTrace();
} }
//结果 跳表写读:101227515 优先级队列写读:248118910 ```
热key收集
思路: