SingleFlight总结
提供了三个参数Do同步通知、Dochan异步chan通知、Forget删除map的存在的键值
每次调用都会重置map
结构体是由mu sync.Mutex、m map[string]*call组成,mu用于操作的排它锁,m用于存储已有的键值的个数
如果m中存在当前键值对,就会阻塞等待,不存在的时候会加创建call结构存入m中
Dochan会异步执行,将结果放入chan通知其他阻塞的协程
阻塞的协程永的WaitGroup实现
package main
import (
"time"
"golang.org/x/sync/singleflight"
"log"
)
func main() {
var singleSetCache singleflight.Group
getAndSetCache := func(requestID int, cacheKey string) (string, error) {
log.Printf("request %v start to get and set cache...", requestID)
value, _, _ := singleSetCache.Do(cacheKey, func() (ret interface{}, err error) { //do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB
log.Printf("request %v is setting cache...", requestID)
time.Sleep(3 * time.Second)
log.Printf("request %v set cache success!", requestID)
return "VALUE", nil
})
return value.(string), nil
}
cacheKey := "cacheKey"
for i := 1; i < 10; i++ { //模拟多个协程同时请求
go func(requestID int) {
value, _ := getAndSetCache(requestID, cacheKey)
log.Printf("request %v get value: %v", requestID, value)
}(i)
}
time.Sleep(20 * time.Second)
for i := 1; i < 10; i++ { //模拟多个协程同时请求
go func(requestID int) {
value, _ := getAndSetCache(requestID, cacheKey)
log.Printf("request %v get value: %v", requestID, value)
}(i)
}
time.Sleep(20 * time.Second)
}
docall 的简单使用
package main
import (
"errors"
"golang.org/x/sync/singleflight"
"log"
"time"
)
func main() {
var singleSetCache singleflight.Group
getAndSetCache := func(requestID int, cacheKey string) (string, error) {
log.Printf("request %v start to get and set cache...", requestID)
retChan := singleSetCache.DoChan(cacheKey, func() (ret interface{}, err error) {
log.Printf("request %v is setting cache...", requestID)
time.Sleep(3 * time.Second)
log.Printf("request %v set cache success!", requestID)
return "VALUE", nil
})
var ret singleflight.Result
timeout := time.After(5 * time.Second)
select { //加入了超时机制
case <-timeout:
log.Printf("time out!")
return "", errors.New("time out")
case ret = <-retChan: //从chan中取出结果
return ret.Val.(string), ret.Err
}
return "", nil
}
cacheKey := "cacheKey"
for i := 1; i < 10; i++ {
go func(requestID int) {
value, _ := getAndSetCache(requestID, cacheKey)
log.Printf("request %v get value: %v", requestID, value)
}(i)
}
time.Sleep(20 * time.Second)
}
每一次调用dochan都会重新开始,内存不会持久化支持的map集合,只不过将结果挡在chan中进行传输