我们考虑这样一个场景:我们读数据首先向redis查询,如果redis不存在我们再向DB查询,最后回写redis。利用这个缓存策略,可以避免直接并发读DB,进而保护了DB。如果我们有一个热点key,每秒有数万次读取,假设此时这个key并不存在(比如key超时失效了),我们海量的查询会绕过了redis直接向DB查询,此时DB压力瞬间上升,很有可能把DB打穿。这个情形我们一般称为缓存击穿,缓存击穿一定要在系统设计中考虑到,因为系统没有做好缓存击穿的周全应对,你的DB肯定会瘫痪。
Go中的singleflight(从字面翻译可以叫做单飞模式)是一种合并相同请求的处理方式,利用进程锁控制每个时刻最多只有一个相同的请求在执行,比如缓存击穿的场景,其实我们只要保证我们同一时刻只有一个请求去查DB,查完DB之后数据会回写到Redis,后续的请求直接读redis就好,这就避免了直接并发访问DB。这就是singleflight的核心思想。
singleflight实现分析
singleflight的代码源于google的groupcache项目:https://github.com/golang/groupcache/blob/41bb18bfe9da5321badc438f91158cd790a33aa3/singleflight/singleflight.go#L32
singleflight二十行代码就把可以合并相同请求的功能完成,相当优雅,这里想深入学习下其设计思路。
type call struct {
wg sync.WaitGroup // 记录一下有哪些协程在阻塞在这个请求里,通过WaitGroup控制协程进度
val interface{} // 请求的返回值
err error
}
type Group struct {
mu sync.Mutex // protects m;互斥锁,用于保护并发读写m
m map[string]*call // lazily initialized;操作的key都保存在这里
}
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock() // 注意,全程加锁
if g.m == nil {
g.m = make(map[string]*call) // 惰性分配,用的时候才把这个map分配出来
}
// 如果发现已经有相同的请求正在执行了,先释放锁,原地等待
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait() // 这里有点意思,相同请求存在但未返回时,其他相同的请求都会走到这里等待,此时的c.val为空值;但当wait结束往下走时,此时c,val已经变成正确的值了。
return c.val, c.err
}
// 如果没有相同的请求在执行,那么需要执行这个请求
c := new(call)
c.wg.Add(1)
g.m[key] = c // 把call对象放入map中
g.mu.Unlock() // 此时就可以解锁
c.val, c.err = fn() // 执行业务函数,返回值c.val是我们需要读取到的值,比如DB的数据
c.wg.Done() // 函数返回后,通知其他waitgroup的成员可以结束了
// 下面这一段是需要加锁的,作用是把这个函数key删除掉
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}
总结一下singleflight是怎么处理相同请求合并的:
- 当相同请求并发执行时,都需要先取到锁,没有锁的请求需等待锁。
- 有锁的请求需检查下map中是否有相同的请求在执行,有的话就释放锁,调用sync.WaitGroup的wait进行等待。
- 如果map中没有相同的key,则表示当前没有相同的请求正在执行,此时就直接执行该请求,把自己的操作key放入map,此时就可以释放自己持有的锁了。
- 执行业务函数,拿到返回值后就调用sync.WaitGroup的Done通知其他等待的协程可以继续往下执行。其他处于等待的协程收到消息后继续执行,向上层返回函数执行的返回值(返回值是存在map里了,因为是指针,所以此时的返回值就是函数执行后的值)。
- 此时需要重新申请锁,需要对map中自己操作的key进行删除。删除后释放锁。
这里利用singleflight模拟并发读DB,LoadDb()是个读DB操作,大概耗时1秒,我们开启了100个协程并发执行这个函数,模拟热点key不在内存,请求都往DB读数据。我们采用了singleflight策略来应对这些重复请求,防DB被打崩。
package main
import (
"sync"
"fmt"
"time"
)
type call struct {
wg sync.WaitGroup // 记录一下有哪些协程在阻塞在这个请求里,通过WaitGroup控制协程进度
val interface{} // 请求的返回值
err error
}
type Group struct {
mu sync.Mutex // protects m;互斥锁,用于保护并发读写m
m map[string]*call // lazily initialized;操作的key都保存在这里
}
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock() // 注意,全程加锁
if g.m == nil {
g.m = make(map[string]*call) // 惰性分配,用的时候才把这个map分配出来
}
// 如果发现已经有相同的请求正在执行了,先释放锁,原地等待
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait() // 这里有点意思,相同请求存在但未返回时,其他相同的请求都会走到这里等待,此时的c.val为空值;但当wait结束往下走时,此时c,val已经变成正确的值了。
return c.val, c.err
}
// 如果没有相同的请求在执行,那么需要执行这个请求
c := new(call)
c.wg.Add(1)
g.m[key] = c // 把call对象放入map中
g.mu.Unlock() // 此时就可以解锁
c.val, c.err = fn() // 执行业务函数,返回值c.val是我们需要读取到的值,比如DB的数据
c.wg.Done() // 函数返回后,通知其他waitgroup的成员可以结束了
// 下面这一段是需要加锁的,作用是把这个函数key删除掉
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}
func LoadDb() (val interface{}, err error) {
fmt.Println("LoadDb start")
time.Sleep(time.Duration(1)*time.Second)
fmt.Println("LoadDb end")
return 1, nil
}
func main() {
stop := make(chan int, 1)
g := Group{}
for i := 0; i < 100; i++ {
go func(index int) {
val, err := g.Do("read_db", LoadDb)
if err != nil {
fmt.Printf("index=%d, LoadDb err:%v\n", index, err)
return
}
fmt.Printf("index=%d, LoadDb val:%v\n", index, val)
}(i)
}
<- stop
}
输出如下
junshideMacBook-Pro:chan junshili$ go run .
LoadDb start
LoadDb end
index=23, LoadDb val:1
index=18, LoadDb val:1
index=16, LoadDb val:1
index=22, LoadDb val:1
index=24, LoadDb val:1
index=14, LoadDb val:1
index=26, LoadDb val:1
index=4, LoadDb val:1
index=27, LoadDb val:1
...
singleflight缺点一
但是这个最基础版本的singleflight是有缺陷的,比如我们执行函数时会去DB读取数据,如果发生网络波动,这个请求一直没回来怎么办,又或者这个请求卡了3秒,所有请求都会被卡3秒,此时这些等待的协程都会阻塞住,影响程序运行。所以使用singleflight记得处理好请求超时逻辑,如果请求超时了记得提前返回,不要所有协程都阻塞住了。
一个解决方式就是在loadDb上再封装一层,利用channel selcet和time.after做请求的超时控制,一旦查询超时,就马上返回,防止一直阻塞,影响程序运行。
func SingleFlightDoLoadDb(g *Group, fn func() (interface{}, error)) (val interface{}, err error){
ch := make(chan string)
defer close(ch)
Timeout := time.Second * time.Duration(1)
go func() {
val, err = g.Do("read_db", fn)
_, ok := <-ch
if !ok {
return
}
ch <- "ok"
} ()
select {
case <-time.After(Timeout):
//fmt.Printf("SingleFlightDoLoadDb handle timeout: expect within %s\n", Timeout)
return nil, errors.New("handle timeout")
case _ = <-ch:
return val,err
}
}
func main() {
stop := make(chan int, 1)
g := Group{}
for i := 0; i < 100; i++ {
go func(index int) {
val, err := SingleFlightDoLoadDb(&g, LoadDb)
if err != nil {
fmt.Printf("index=%d, LoadDb err:%v\n", index, err)
return
}
fmt.Printf("index=%d, LoadDb val:%v\n", index, val)
}(i)
}
<- stop
}
singleflight缺点二
singleflight第二个问题,是Do函数内,调用函数c.val, c.err = fn()
时有可能发生panic,导致后面的逻辑没法执行(没法delete key),因此会导致g.m[key]一直残留,后续继续执行相同的请求都会一直卡住。因此一个比较好的处理方式是利用defer来delete map key。
// 如果没有相同的请求在执行,那么需要执行这个请求
c := new(call)
c.wg.Add(1)
g.m[key] = c // 把call对象放入map中
g.mu.Unlock() // 此时就可以解锁
c.val, c.err = fn() // 执行业务函数,返回值c.val是我们需要读取到的值,比如DB的数据
c.wg.Done() // 函数返回后,通知其他waitgroup的成员可以结束了
defer func () {
// 下面这一段是需要加锁的,作用是把这个函数key删除掉
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
} ()