LRU的实现已经不是第一次来做了,比如我曾经用C++和Python都实现过LRU,最近休年假躺家里,打算用Go实现一版LRU,准备应用到最近的项目里面去。一个LRU Cache可以应用的场景很多,比如在游戏领域我们需要对上线的玩家数据进行缓存,这个场景就很适合LRU Cache。玩家上线时先判断下玩家数据是否在cache中,不在就去读DB,回写LRU cache;如果直接命中Cache中就继续进行后面的登陆逻辑。假设我们的最大在线玩家数据LRU cache容量设置为20000,而我们日活预估为100000,那么应用LRU cache就很匹配这个场景,上线先LRU cache GET数据,玩家下线并不清理对应的缓存,而是用LRU策略进行淘汰。
Go实现一个并发安全的LRU Cache需要这些技术细节:
- 数据存储的数据结构使用双向链表,这里会用到标准库的的container/list
- 使用map来存储key,value存储指向链表元素的指针,我们通过map来快速判断key是否位于Cache中
- 使用sync.Mutex对cache的读写操作进行加锁,保证并发安全
- 利用函数指针做key淘汰时的回调,即key被淘汰时,会触发执行指定函数
- 记录cache的查询次数,以及命中次数,用于计算Cache的命中率
以下的LRU Cache就是按照上面的思路来进行设计实现的。
https://github.com/AstarLight/go-lru/blob/main/lru/lru.go
首先定义Cache类,尤其需要关注LRU Cache元素的存储方式,是使用了链表来存储元素。另外使用map来快速判断key是否在cache中。
type Key interface{}
type Cache struct {
MaxCapacity int
linklist *list.List //链表存储LRU元素
Callback func(key Key, value interface{}) // key失效时的回调
cache map[interface{}]*list.Element // map的key可以为任何类型,map的value是指向linklist的指针
NeedLock bool // 并发安全LRU需要锁
lock sync.Mutex
HitCnt int // 命中的次数
GetCnt int // 请求读的次数
}
new一个LRU cache 需要传入最大的内存容量、是否需要加锁、回调函数的函数指针。这里可以自行选择是否需要加锁,如果在并发场景下就必须加锁,如果应用场景是非并发场景,可以不加锁来提升读写cache速度。
func NewLruCache(maxCapacity int, needLock bool, fn func(key Key, value interface{})) *Cache {
if maxCapacity <= 0 {
return nil
}
return &Cache {
MaxCapacity:maxCapacity,
linklist:list.New(),
cache:make(map[interface{}]*list.Element),
NeedLock:needLock,
Callback:fn,
}
}
向cache添加一个元素。注意,如果是并发安全的cache需要在函数执行开始就要申请锁来加锁,如果cache容量已经达到阈值,那就先移除链表尾部的元素(最久未访问的元素),再往链表头插入新元素。记得在移除链表尾部元素时需要同时把cache的map key也一并移除,如果删除key后需要callback那就再执行下callback函数。
// 添加元素到lru cache
func (c *Cache) Add(key Key, val interface{}) {
if c.NeedLock {
c.lock.Lock()
}
defer c.lock.Unlock()
if c.cache == nil {
c.linklist = list.New()
}
// 检查是否到达容量最大值,到达最大值就需要移除链表尾部的元素
if c.Len() >= c.MaxCapacity {
c.removeOldest()
}
// 把元素添加到头部
elem := c.linklist.PushFront(&Kv{k:key, v:val})
c.cache[key] = elem
}
// 移除链表中最后的那个元素
func (c *Cache) removeOldest() {
if c.cache == nil {
return
}
oldest := c.linklist.Back()
if oldest == nil {
return
}
c.linklist.Remove(oldest)
kv := oldest.Value.(*Kv)
delete(c.cache, kv.k)
if c.Callback != nil {
c.Callback(kv.k, kv.v)
}
}
向LRU cache读数据。我们拿着key来向LRU cache读数据时,我们也需要先加锁,后面再判断key是否位于cache map中,如果不在就返回cache没有该数据的error,让上层自行决定下一步操作;如果key存在表明该数据在LRU cache中,我们可以直接读取数据,记得同时把链表中该元素的位置移到链表最前面。
// 从lru cache读取元素
func (c *Cache) Get(key Key) (val interface{}, err error) {
if c.NeedLock {
c.lock.Lock()
}
defer c.lock.Unlock()
c.GetCnt += 1
if c.cache == nil {
return nil, errors.New("lru cache is not exist")
}
elem, ok := c.cache[key]
if !ok {
return nil, errors.New("key not in cache")
}
c.HitCnt += 1
c.linklist.MoveToFront(elem)
return elem.Value.(*Kv).v, nil
}
最后是LRU的使用案例
var hotCache *lru.Cache
// LRU key 被删除时的回调通知
func keyDeleteNotify(key lru.Key, val interface{}) {
fmt.Printf("keyDeleteNotify, key=%+v, val=%+v\n", key, val)
}
func main() {
hotCache = lru.NewLruCache(3, true, keyDeleteNotify) // 启用一个容量为3,并发安全的LRU CACHE
for i := 0; i < 1000000; i++ {
go func(index int) {
key := fmt.Sprintf("key_%d", index%5)
hotCache.Add(key, index)
fmt.Printf("cache add, key=%+v, val=%+v\n", key, index)
val, err := hotCache.Get(key)
if err != nil {
//fmt.Printf("cache get error, key=%+v, err=%+v\n", key, err)
return
}
fmt.Printf("cache get OK, key=%+v, val=%+v\n", key, val)
}(i)
}
time.Sleep(time.Second * time.Duration(10))
fmt.Printf("cache GetCnt=%d, HitCnt=%d, HitPercent=%f\n", hotCache.GetCnt, hotCache.HitCnt, float64(hotCache.HitCnt)/float64(hotCache.GetCnt))
}