groupcache

一句话描述

groupcache 是一个分布式缓冲库

简介

groupcache是什么?

  • groupcache是一个分布式缓冲库
  • 是Server又是 Client
  • 与其他peer相连
  • 热点缓存副本

Example

  1. git clone git@github.com:colinrs/groupcache-db-example.git
  2. cd groupcache-db-example
  3. make run

每日一库之83:groupcache - 图1

  • API服务接收来自用户的请求
  • 每一个API服务都有一个groupcache实例
  • groupcache 最终的数据源是 DBServer

代码概览

  1. ├── byteview.go # 字节操作
  2. ├── byteview_test.go
  3. ├── consistenthash #一致性hash实现
  4. ├── groupcache.go # cache操作
  5. ├── groupcache_test.go
  6. ├── groupcachepb # pb文件
  7. ├── http.go # http 服务
  8. ├── http_test.go
  9. ├── lru # LRU 实现
  10. ├── peers.go # 操作peer
  11. ├── singleflight
  12. ├── sinks.go
  13. └── testpb

核心的存储结构

Group

  1. type Group struct {
  2. name string
  3. getter Getter // 获取数据接口
  4. peersOnce sync.Once // 保证初始化一次peer
  5. peers PeerPicker // peer获取
  6. cacheBytes int64 // 对缓存大小的限制接口
  7. mainCache cache // mainCache 是分布式中本地分配到的cache部分
  8. hotCache cache // hotcache是由于访问频率高而被复制到此节点的缓存,尽管本节点不是它的拥有者。
  9. loadGroup flightGroup // 保证key只会获取一次
  10. _ int32
  11. Stats Stats
  12. }

流程分析

初始化

  1. // InitCache ...
  2. func InitCache(port string) {
  3. // HTTP Server 设置
  4. opt := &groupcache.HTTPPoolOptions{
  5. Replicas: 1, // 缓存副本
  6. BasePath: "/gouache/", // 缓存请求路径
  7. }
  8. // peers地址
  9. cacheGroupHosts := []string{"http://127.0.0.1:8001", "http://127.0.0.1:8002", "http://127.0.0.1:8003"}
  10. // peer 初始化
  11. peers := groupcache.NewHTTPPoolOpts("http://127.0.0.1:" + cachePort, opt)
  12. peerMap := consistenthash.New(opt.Replicas, opt.HashFn)
  13. peerMap.Add(cacheGroupHosts...)
  14. cacheGroup := groupcache.NewGroup("SlowDBCache", 64<<20, groupcache.GetterFunc(
  15. // 源数据获取实现
  16. ))
  17. peers.Set(cacheGroupHosts...) //设置peers地址
  18. logger.Info("cachegroup:%s slave starting on:127.0.0.1:%s",cacheGroup.Name(), cachePort)
  19. // 开启HTTP服务
  20. logger.Fatal(http.ListenAndServe(fmt.Sprintf("127.0.0.1:%s",cachePort),http.HandlerFunc(peers.ServeHTTP)))
  21. }
  • groupcache.NewHTTPPoolOpts 初始化和设置HTTP Server
  • groupcache.NewGroup 初始化和设置cache
  • consistenthash.New 初始化一致性hash,这里我是为了我们可以找到Key对应的peer才做了这个操作

缓存数据获取

  • 使用groupcache.Get 方法获取到数据
  1. func GetData(c *gin.Context) {
  2. req := new(Req)
  3. err := c.ShouldBind(req)
  4. if err!=nil{
  5. c.String(http.StatusOK, err.Error())
  6. return
  7. }
  8. var b []byte
  9. //Get方法就是groupcache获取数据的方法, b []byte 会存储获取到的值
  10. apiCacheGroup.group.Get(c.Request.Context(), req.Key, groupcache.AllocatingByteSliceSink(&b))
  11. result := map[string]interface{}{
  12. "key": req.Key,
  13. "value": string(b),
  14. }
  15. c.JSON(http.StatusOK, result)
  16. }
  • Get 方法会尝试从 mainCache 和 hotCache 中获取数据
  • 如果本地没有,则用load从数据源或者peer获取数据
  1. // Get ...
  2. func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
  3. g.peersOnce.Do(g.initPeers)
  4. g.Stats.Gets.Add(1)
  5. if dest == nil {
  6. return errors.New("groupcache: nil dest Sink")
  7. }
  8. value, cacheHit := g.lookupCache(key) // 从本地的mainCache 和 hitCache 获取数据
  9. if cacheHit {
  10. g.Stats.CacheHits.Add(1)
  11. return setSinkView(dest, value)
  12. }
  13. // Optimization to avoid double unmarshalling or copying: keep
  14. // track of whether the dest was already populated. One caller
  15. // (if local) will set this; the losers will not. The common
  16. // case will likely be one caller.
  17. destPopulated := false
  18. value, destPopulated, err := g.load(ctx, key, dest) // 从数据源或者peer获取数据
  19. if err != nil {
  20. return err
  21. }
  22. if destPopulated {
  23. return nil
  24. }
  25. return setSinkView(dest, value)
  26. }
  • load 依然会从本地获取一次,因为在并发的情况下,有可能有一个协程已经将值获取到了并设置到本地缓存中
  • 然后PickPeer 获取到Key对应的Peer
  • 如果从Peer获取失败了,则用getLocally从数据源获取数据
  • 最后将数据缓存在本地
  1. // load loads key either by invoking the getter locally or by sending it to another machine.
  2. func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
  3. g.Stats.Loads.Add(1)
  4. viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
  5. // 再一次从本地缓存获取, 因为在并发的情况下,有可能有一个协程已经将值获取到了并设置到本地缓存中
  6. if value, cacheHit := g.lookupCache(key); cacheHit {
  7. g.Stats.CacheHits.Add(1)
  8. return value, nil
  9. }
  10. g.Stats.LoadsDeduped.Add(1)
  11. var value ByteView
  12. var err error
  13. // 获取到peer
  14. if peer, ok := g.peers.PickPeer(key); ok {
  15. // 从peer获取到数据
  16. value, err = g.getFromPeer(ctx, peer, key)
  17. if err == nil {
  18. g.Stats.PeerLoads.Add(1)
  19. return value, nil
  20. }
  21. g.Stats.PeerErrors.Add(1)
  22. // TODO(bradfitz): log the peer's error? keep
  23. // log of the past few for /groupcachez? It's
  24. // probably boring (normal task movement), so not
  25. // worth logging I imagine.
  26. }
  27. // 从数据源获取到值,也就是我们在初始化的注册的 Getter 接口
  28. value, err = g.getLocally(ctx, key, dest)
  29. if err != nil {
  30. g.Stats.LocalLoadErrs.Add(1)
  31. return nil, err
  32. }
  33. g.Stats.LocalLoads.Add(1)
  34. destPopulated = true // only one caller of load gets this return value
  35. g.populateCache(key, value, &g.mainCache) // 从数据源获取到的数据缓存在mainCache中,同时也会根据缓存大小清除hotCache中较少使用的
  36. return value, nil
  37. })
  38. if err == nil {
  39. value = viewi.(ByteView)
  40. }
  41. return
  42. }
  43. func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) {
  44. req := &pb.GetRequest{
  45. Group: &g.name,
  46. Key: &key,
  47. }
  48. res := &pb.GetResponse{}
  49. err := peer.Get(ctx, req, res) // 从perr获取,这里的peer是 httpGetter 的实例,最终是通过HTTP请求去请求peer
  50. if err != nil {
  51. return ByteView{}, err
  52. }
  53. value := ByteView{b: res.Value}
  54. // TODO(bradfitz): use res.MinuteQps or something smart to
  55. // conditionally populate hotCache. For now just do it some
  56. // percentage of the time.
  57. if rand.Intn(10) == 0 {
  58. g.populateCache(key, value, &g.hotCache) // 从peer获取到的数据是设置到hotCache中
  59. }
  60. return value, nil
  61. }

流程图如下

每日一库之83:groupcache - 图2

写入缓存流程

  • 将从数据源内容更新到mainCache缓存中
  • 将从peer获取到的数据更新到hotCache缓存中

几个有趣的点

peer的查询

  • 给定一个key,groupcache会在本地找不到缓存的情况下,查询该key应该存在的peer。
  • 为了在新增或删除peer的时候尽量少的缓存失效,groupcache使用一致性hash的方案,并提供了一个consistenthash的实现,就在consistenthash/consistenthash.go中。

我们再来看下peer的设置
  • peer的设置
  1. // 设置peer集群
  2. cacheGroupHosts := []string{"http://127.0.0.1:8001", "http://127.0.0.1:8002", "http://127.0.0.1:8003"}
  3. // 初始化本地peer
  4. peers := groupcache.NewHTTPPoolOpts("http://127.0.0.1:" + cachePort, opt)
  5. // 设置peer集群
  6. peers.Set(cacheGroupHosts...)
  7. // peer 提供HTTP 服务供其他的peer来查询数据
  8. logger.Fatal(http.ListenAndServe(fmt.Sprintf("127.0.0.1:%s",cachePort),http.HandlerFunc(peers.ServeHTTP)))
  • 初始化一致性hash
  • 初始化本地peer的Getter接口,是httpGetter实例
  • Map.Add 方法将peer地址算出一个hash值,根据设置的副本数量将peer放在hash环中对应的位置
  1. func (p *HTTPPool) Set(peers ...string) {
  2. p.mu.Lock()
  3. defer p.mu.Unlock()
  4. p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
  5. p.peers.Add(peers...)
  6. p.httpGetters = make(map[string]*httpGetter, len(peers))
  7. for _, peer := range peers {
  8. p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
  9. }
  10. }
  11. // Add adds some keys to the hash.
  12. func (m *Map) Add(keys ...string) {
  13. for _, key := range keys {
  14. for i := 0; i < m.replicas; i++ {
  15. hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
  16. m.keys = append(m.keys, hash)
  17. m.hashMap[hash] = key
  18. }
  19. }
  20. sort.Ints(m.keys)
  21. }

peer的获取
  • peer的获取主要看 consistenthash 中Map 方法
  • 首先会使用相同的hash函数算出hash值
  • 然后将hash值排序之后找出peer在hash环中位置 index
  • 最后再从hashMap中根据hash值获取到hash值对应的peer地址
  1. // Get gets the closest item in the hash to the provided key.
  2. func (m *Map) Get(key string) string {
  3. if m.IsEmpty() {
  4. return ""
  5. }
  6. hash := int(m.hash([]byte(key)))
  7. // Binary search for appropriate replica.
  8. idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
  9. // Means we have cycled back to the first replica.
  10. if idx == len(m.keys) {
  11. idx = 0
  12. }
  13. return m.hashMap[m.keys[idx]]
  14. }

缓存从数据源或者peer获取数据方式

  • 从数据源或者peer获取数据方式会保证对同一个Key只会有一个请求在请求数据源或者peer
  • 主要看 flightGroup的Do方法
  1. // load loads key either by invoking the getter locally or by sending it to another machine.
  2. func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
  3. g.Stats.Loads.Add(1)
  4. viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
  5. // 从数据源或者peer获取数据
  6. })
  7. return
  8. }
  9. // flightGroup的Do方法
  10. // 使用 mux 保证只会有一个协程在设置 g.m
  11. // g.m 用来判断是否有key存在
  12. // 使用 call (实际上就是 WaitGroup 包了一次)保证第二个请求同一个Key时需要等到前一个请求完成,直接使用前一个请求的结果就可以
  13. func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  14. g.mu.Lock()
  15. if g.m == nil {
  16. g.m = make(map[string]*call)
  17. }
  18. if c, ok := g.m[key]; ok { //如果已经有一个key初始化了,那么只需要等到请求完成就可以了,不需要再请求
  19. g.mu.Unlock()
  20. c.wg.Wait()
  21. return c.val, c.err // 使用前一个请求的结果就可以
  22. }
  23. c := new(call) // 如果没有,则西药初始化call
  24. c.wg.Add(1)
  25. g.m[key] = c // 设置key对应的call
  26. g.mu.Unlock()
  27. c.val, c.err = fn() // 实际的业务函数
  28. c.wg.Done()
  29. g.mu.Lock()
  30. delete(g.m, key) // 删除key对应的call
  31. g.mu.Unlock()
  32. return c.val, c.err
  33. }

为何缓存没有过期时间设置

Doc

参考: