SingleFlight总结
提供了三个参数Do同步通知、Dochan异步chan通知、Forget删除map的存在的键值
每次调用都会重置map
结构体是由mu sync.Mutex、m map[string]*call组成,mu用于操作的排它锁,m用于存储已有的键值的个数
如果m中存在当前键值对,就会阻塞等待,不存在的时候会加创建call结构存入m中
Dochan会异步执行,将结果放入chan通知其他阻塞的协程
阻塞的协程永的WaitGroup实现

  1. package main
  2. import (
  3. "time"
  4. "golang.org/x/sync/singleflight"
  5. "log"
  6. )
  7. func main() {
  8. var singleSetCache singleflight.Group
  9. getAndSetCache := func(requestID int, cacheKey string) (string, error) {
  10. log.Printf("request %v start to get and set cache...", requestID)
  11. value, _, _ := singleSetCache.Do(cacheKey, func() (ret interface{}, err error) { //do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB
  12. log.Printf("request %v is setting cache...", requestID)
  13. time.Sleep(3 * time.Second)
  14. log.Printf("request %v set cache success!", requestID)
  15. return "VALUE", nil
  16. })
  17. return value.(string), nil
  18. }
  19. cacheKey := "cacheKey"
  20. for i := 1; i < 10; i++ { //模拟多个协程同时请求
  21. go func(requestID int) {
  22. value, _ := getAndSetCache(requestID, cacheKey)
  23. log.Printf("request %v get value: %v", requestID, value)
  24. }(i)
  25. }
  26. time.Sleep(20 * time.Second)
  27. for i := 1; i < 10; i++ { //模拟多个协程同时请求
  28. go func(requestID int) {
  29. value, _ := getAndSetCache(requestID, cacheKey)
  30. log.Printf("request %v get value: %v", requestID, value)
  31. }(i)
  32. }
  33. time.Sleep(20 * time.Second)
  34. }

每一次调用do都会重新开始,内存不会持久化支持的map集合

docall 的简单使用

  1. package main
  2. import (
  3. "errors"
  4. "golang.org/x/sync/singleflight"
  5. "log"
  6. "time"
  7. )
  8. func main() {
  9. var singleSetCache singleflight.Group
  10. getAndSetCache := func(requestID int, cacheKey string) (string, error) {
  11. log.Printf("request %v start to get and set cache...", requestID)
  12. retChan := singleSetCache.DoChan(cacheKey, func() (ret interface{}, err error) {
  13. log.Printf("request %v is setting cache...", requestID)
  14. time.Sleep(3 * time.Second)
  15. log.Printf("request %v set cache success!", requestID)
  16. return "VALUE", nil
  17. })
  18. var ret singleflight.Result
  19. timeout := time.After(5 * time.Second)
  20. select { //加入了超时机制
  21. case <-timeout:
  22. log.Printf("time out!")
  23. return "", errors.New("time out")
  24. case ret = <-retChan: //从chan中取出结果
  25. return ret.Val.(string), ret.Err
  26. }
  27. return "", nil
  28. }
  29. cacheKey := "cacheKey"
  30. for i := 1; i < 10; i++ {
  31. go func(requestID int) {
  32. value, _ := getAndSetCache(requestID, cacheKey)
  33. log.Printf("request %v get value: %v", requestID, value)
  34. }(i)
  35. }
  36. time.Sleep(20 * time.Second)
  37. }
  1. 每一次调用dochan都会重新开始,内存不会持久化支持的map集合,只不过将结果挡在chan中进行传输