我们考虑这样一个场景:我们读数据首先向redis查询,如果redis不存在我们再向DB查询,最后回写redis。利用这个缓存策略,可以避免直接并发读DB,进而保护了DB。如果我们有一个热点key,每秒有数万次读取,假设此时这个key并不存在(比如key超时失效了),我们海量的查询会绕过了redis直接向DB查询,此时DB压力瞬间上升,很有可能把DB打穿。这个情形我们一般称为缓存击穿,缓存击穿一定要在系统设计中考虑到,因为系统没有做好缓存击穿的周全应对,你的DB肯定会瘫痪。

Go中的singleflight(从字面翻译可以叫做单飞模式)是一种合并相同请求的处理方式,利用进程锁控制每个时刻最多只有一个相同的请求在执行,比如缓存击穿的场景,其实我们只要保证我们同一时刻只有一个请求去查DB,查完DB之后数据会回写到Redis,后续的请求直接读redis就好,这就避免了直接并发访问DB。这就是singleflight的核心思想。

singleflight实现分析

singleflight的代码源于google的groupcache项目:https://github.com/golang/groupcache/blob/41bb18bfe9da5321badc438f91158cd790a33aa3/singleflight/singleflight.go#L32

singleflight二十行代码就把可以合并相同请求的功能完成,相当优雅,这里想深入学习下其设计思路。

  1. type call struct {
  2. wg sync.WaitGroup // 记录一下有哪些协程在阻塞在这个请求里,通过WaitGroup控制协程进度
  3. val interface{} // 请求的返回值
  4. err error
  5. }
  6. type Group struct {
  7. mu sync.Mutex // protects m;互斥锁,用于保护并发读写m
  8. m map[string]*call // lazily initialized;操作的key都保存在这里
  9. }
  10. func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  11. g.mu.Lock() // 注意,全程加锁
  12. if g.m == nil {
  13. g.m = make(map[string]*call) // 惰性分配,用的时候才把这个map分配出来
  14. }
  15. // 如果发现已经有相同的请求正在执行了,先释放锁,原地等待
  16. if c, ok := g.m[key]; ok {
  17. g.mu.Unlock()
  18. c.wg.Wait() // 这里有点意思,相同请求存在但未返回时,其他相同的请求都会走到这里等待,此时的c.val为空值;但当wait结束往下走时,此时c,val已经变成正确的值了。
  19. return c.val, c.err
  20. }
  21. // 如果没有相同的请求在执行,那么需要执行这个请求
  22. c := new(call)
  23. c.wg.Add(1)
  24. g.m[key] = c // 把call对象放入map中
  25. g.mu.Unlock() // 此时就可以解锁
  26. c.val, c.err = fn() // 执行业务函数,返回值c.val是我们需要读取到的值,比如DB的数据
  27. c.wg.Done() // 函数返回后,通知其他waitgroup的成员可以结束了
  28. // 下面这一段是需要加锁的,作用是把这个函数key删除掉
  29. g.mu.Lock()
  30. delete(g.m, key)
  31. g.mu.Unlock()
  32. return c.val, c.err
  33. }

总结一下singleflight是怎么处理相同请求合并的:

  1. 当相同请求并发执行时,都需要先取到锁,没有锁的请求需等待锁。
  2. 有锁的请求需检查下map中是否有相同的请求在执行,有的话就释放锁,调用sync.WaitGroup的wait进行等待。
  3. 如果map中没有相同的key,则表示当前没有相同的请求正在执行,此时就直接执行该请求,把自己的操作key放入map,此时就可以释放自己持有的锁了。
  4. 执行业务函数,拿到返回值后就调用sync.WaitGroup的Done通知其他等待的协程可以继续往下执行。其他处于等待的协程收到消息后继续执行,向上层返回函数执行的返回值(返回值是存在map里了,因为是指针,所以此时的返回值就是函数执行后的值)。
  5. 此时需要重新申请锁,需要对map中自己操作的key进行删除。删除后释放锁。

这里利用singleflight模拟并发读DB,LoadDb()是个读DB操作,大概耗时1秒,我们开启了100个协程并发执行这个函数,模拟热点key不在内存,请求都往DB读数据。我们采用了singleflight策略来应对这些重复请求,防DB被打崩。

  1. package main
  2. import (
  3. "sync"
  4. "fmt"
  5. "time"
  6. )
  7. type call struct {
  8. wg sync.WaitGroup // 记录一下有哪些协程在阻塞在这个请求里,通过WaitGroup控制协程进度
  9. val interface{} // 请求的返回值
  10. err error
  11. }
  12. type Group struct {
  13. mu sync.Mutex // protects m;互斥锁,用于保护并发读写m
  14. m map[string]*call // lazily initialized;操作的key都保存在这里
  15. }
  16. func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  17. g.mu.Lock() // 注意,全程加锁
  18. if g.m == nil {
  19. g.m = make(map[string]*call) // 惰性分配,用的时候才把这个map分配出来
  20. }
  21. // 如果发现已经有相同的请求正在执行了,先释放锁,原地等待
  22. if c, ok := g.m[key]; ok {
  23. g.mu.Unlock()
  24. c.wg.Wait() // 这里有点意思,相同请求存在但未返回时,其他相同的请求都会走到这里等待,此时的c.val为空值;但当wait结束往下走时,此时c,val已经变成正确的值了。
  25. return c.val, c.err
  26. }
  27. // 如果没有相同的请求在执行,那么需要执行这个请求
  28. c := new(call)
  29. c.wg.Add(1)
  30. g.m[key] = c // 把call对象放入map中
  31. g.mu.Unlock() // 此时就可以解锁
  32. c.val, c.err = fn() // 执行业务函数,返回值c.val是我们需要读取到的值,比如DB的数据
  33. c.wg.Done() // 函数返回后,通知其他waitgroup的成员可以结束了
  34. // 下面这一段是需要加锁的,作用是把这个函数key删除掉
  35. g.mu.Lock()
  36. delete(g.m, key)
  37. g.mu.Unlock()
  38. return c.val, c.err
  39. }
  40. func LoadDb() (val interface{}, err error) {
  41. fmt.Println("LoadDb start")
  42. time.Sleep(time.Duration(1)*time.Second)
  43. fmt.Println("LoadDb end")
  44. return 1, nil
  45. }
  46. func main() {
  47. stop := make(chan int, 1)
  48. g := Group{}
  49. for i := 0; i < 100; i++ {
  50. go func(index int) {
  51. val, err := g.Do("read_db", LoadDb)
  52. if err != nil {
  53. fmt.Printf("index=%d, LoadDb err:%v\n", index, err)
  54. return
  55. }
  56. fmt.Printf("index=%d, LoadDb val:%v\n", index, val)
  57. }(i)
  58. }
  59. <- stop
  60. }

输出如下

  1. junshideMacBook-Pro:chan junshili$ go run .
  2. LoadDb start
  3. LoadDb end
  4. index=23, LoadDb val:1
  5. index=18, LoadDb val:1
  6. index=16, LoadDb val:1
  7. index=22, LoadDb val:1
  8. index=24, LoadDb val:1
  9. index=14, LoadDb val:1
  10. index=26, LoadDb val:1
  11. index=4, LoadDb val:1
  12. index=27, LoadDb val:1
  13. ...

singleflight缺点一

但是这个最基础版本的singleflight是有缺陷的,比如我们执行函数时会去DB读取数据,如果发生网络波动,这个请求一直没回来怎么办,又或者这个请求卡了3秒,所有请求都会被卡3秒,此时这些等待的协程都会阻塞住,影响程序运行。所以使用singleflight记得处理好请求超时逻辑,如果请求超时了记得提前返回,不要所有协程都阻塞住了。

一个解决方式就是在loadDb上再封装一层,利用channel selcet和time.after做请求的超时控制,一旦查询超时,就马上返回,防止一直阻塞,影响程序运行。

  1. func SingleFlightDoLoadDb(g *Group, fn func() (interface{}, error)) (val interface{}, err error){
  2. ch := make(chan string)
  3. defer close(ch)
  4. Timeout := time.Second * time.Duration(1)
  5. go func() {
  6. val, err = g.Do("read_db", fn)
  7. _, ok := <-ch
  8. if !ok {
  9. return
  10. }
  11. ch <- "ok"
  12. } ()
  13. select {
  14. case <-time.After(Timeout):
  15. //fmt.Printf("SingleFlightDoLoadDb handle timeout: expect within %s\n", Timeout)
  16. return nil, errors.New("handle timeout")
  17. case _ = <-ch:
  18. return val,err
  19. }
  20. }
  21. func main() {
  22. stop := make(chan int, 1)
  23. g := Group{}
  24. for i := 0; i < 100; i++ {
  25. go func(index int) {
  26. val, err := SingleFlightDoLoadDb(&g, LoadDb)
  27. if err != nil {
  28. fmt.Printf("index=%d, LoadDb err:%v\n", index, err)
  29. return
  30. }
  31. fmt.Printf("index=%d, LoadDb val:%v\n", index, val)
  32. }(i)
  33. }
  34. <- stop
  35. }

singleflight缺点二

singleflight第二个问题,是Do函数内,调用函数c.val, c.err = fn()时有可能发生panic,导致后面的逻辑没法执行(没法delete key),因此会导致g.m[key]一直残留,后续继续执行相同的请求都会一直卡住。因此一个比较好的处理方式是利用defer来delete map key。

  1. // 如果没有相同的请求在执行,那么需要执行这个请求
  2. c := new(call)
  3. c.wg.Add(1)
  4. g.m[key] = c // 把call对象放入map中
  5. g.mu.Unlock() // 此时就可以解锁
  6. c.val, c.err = fn() // 执行业务函数,返回值c.val是我们需要读取到的值,比如DB的数据
  7. c.wg.Done() // 函数返回后,通知其他waitgroup的成员可以结束了
  8. defer func () {
  9. // 下面这一段是需要加锁的,作用是把这个函数key删除掉
  10. g.mu.Lock()
  11. delete(g.m, key)
  12. g.mu.Unlock()
  13. } ()