Interfaces

store.svg
图 1:Store 逻辑图

Store 的设计非常简单,主要有两个接口,两个接口通过 Event 关联起来。Store 接口定义如代码 1 - 1 所示,Event 如代码 1 - 2 所示。Store 中关于数据的操作通过类似于文件路径的形式来唯一定位,常规操作 Get、Set、Update 等容易理解,CompareAndSwap、CompareAndDelete 类似于原子操作中的对应方法。Watch 猜测为观察数据变化情况。

  1. type Store interface {
  2. Version() int
  3. Index() uint64
  4. Get(nodePath string, recursive, sorted bool) (*Event, error)
  5. Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error)
  6. Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error)
  7. Create(nodePath string, dir bool, value string, unique bool,
  8. expireOpts TTLOptionSet) (*Event, error)
  9. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  10. value string, expireOpts TTLOptionSet) (*Event, error)
  11. Delete(nodePath string, dir, recursive bool) (*Event, error)
  12. CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
  13. Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)
  14. Save() ([]byte, error)
  15. Recovery(state []byte) error
  16. Clone() Store
  17. SaveNoCopy() ([]byte, error)
  18. JsonStats() []byte
  19. DeleteExpiredKeys(cutoff time.Time)
  20. HasTTLKeys() bool
  21. }

代码 1 - 1:Store 定义

Event 中 Action 猜测为 Get、Update 等,NodeExtern 为一个树状结构,暂时猜测与数值存储唯一路径关联。

  1. type Event struct {
  2. Action string `json:"action"`
  3. Node *NodeExtern `json:"node,omitempty"`
  4. PrevNode *NodeExtern `json:"prevNode,omitempty"`
  5. EtcdIndex uint64 `json:"-"`
  6. Refresh bool `json:"refresh,omitempty"`
  7. }

代码 1 - 2:Event 定义

NodeExtern 保存与 Key 值相关的树状结构,Nodes 为下一级子节点,8 ~ 9 行的 Index 现在可理解为文件的修改时间和创建时间。NodeExtern 排序接口实现代码见代码 1 - 4。

  1. type NodeExtern struct {
  2. Key string `json:"key,omitempty"`
  3. Value *string `json:"value,omitempty"`
  4. Dir bool `json:"dir,omitempty"`
  5. Expiration *time.Time `json:"expiration,omitempty"`
  6. TTL int64 `json:"ttl,omitempty"`
  7. Nodes NodeExterns `json:"nodes,omitempty"`
  8. ModifiedIndex uint64 `json:"modifiedIndex,omitempty"`
  9. CreatedIndex uint64 `json:"createdIndex,omitempty"`
  10. }

代码 1 - 3:NodeExtern 定义

  1. func (ns NodeExterns) Len() int {
  2. return len(ns)
  3. }
  4. func (ns NodeExterns) Less(i, j int) bool {
  5. return ns[i].Key < ns[j].Key
  6. }
  7. func (ns NodeExterns) Swap(i, j int) {
  8. ns[i], ns[j] = ns[j], ns[i]
  9. }

代码 1 - 4:NodeExtern 排序接口实现

Store

Store 接口的实现为 store 结构,其核心组件如图 2 所示。Root 指向的是一个树状结构,这个树状结构类似于文件系统,其中的 Children 对象中的 string 值为文件子目录(自文件)名,假如一个完整的几点路径为:/first/second/third/node,那么将从 Root 开始查找 first 对应的 node,再从其 Children 中查找键值为 second 的 node,直至最终找到一个确定的节点。

WatcherHub 中设置对于特定路径的观察对象列表,它包含了一个事件队列以及一个观察者链表,当要持续观察一个特定路径时,会在对应的列表上创建一个新 watcher 对象,接下来所以对该路径修改的操作产生的事件对象都会发送至 watcher 的 eventChan 中。

另一个核心组件为 ttlKeyHeap,维持了一个堆结构,用于控制 node 的生存周期,根据 node 的超时时间进行排序。

store-store.svg
图 2:store 核心组件

核心结构之一 node 定义在代码 2 - 1 中,每个 node 包含一个路径、创建索引计数和修改索引计数,并有一个指向父节点的指针和根据子目录关键字确定的子目录树,超时时间在 ttlKeyHeap 中使用。

  1. type node struct {
  2. Path string
  3. CreatedIndex uint64
  4. ModifiedIndex uint64
  5. Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
  6. ExpireTime time.Time
  7. Value string // for key-value pair
  8. Children map[string]*node // for directory
  9. // A reference to the store this node is attached to.
  10. store *store
  11. }

代码 2 - 1:node 定义

Walk

对 node 节点的各种操作(增删改查等)都需要首先遍历节点树,以找到合适的节点,遍历操作在代码 2 - 2 中。第 2 行对路径进行分割操作,第 7 行开始进行节点树遍历操作,遍历时会使用自定义遍历方法 walkFunc 对当前节点进行处理,并返回下一个要处理的节点。

  1. func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *v2error.Error)) (*node, *v2error.Error) {
  2. components := strings.Split(nodePath, "/")
  3. curr := s.Root
  4. var err *v2error.Error
  5. for i := 1; i < len(components); i++ {
  6. if len(components[i]) == 0 { // ignore empty string
  7. return curr, nil
  8. }
  9. curr, err = walkFunc(curr, components[i])
  10. if err != nil {
  11. return nil, err
  12. }
  13. }
  14. return curr, nil
  15. }

代码 2 - 2:walk 方法实现

以 checkDir 为例来说明 walkFunc 使用方法,在代码 2 - 3 中第 2 行,首先根据一级目录来查找子节点,如果当前节点存在且是目录节点,那么返回当前节点即可,注意,返回的当前节点会用于下次调用的参数;如果不存在,那么就创建新节点,注意第 12 行中第二个参数,使用父节点的 Path 与要查找的目录路径 dirName 进行拼接操作,这就说明 node 中的 Path 保存的是完整路径名,Children 中的键值只保留当前目录名,如果遍历到当前级别为 /first/second,要查找目录为 third,那么新节点 n 的 Path 中保存的将是全路径 first/second/third,parent 遍历的 Children 中会添加一个键值 third 保存当前新节点 n,如图 3 所示。

  1. func (s *store) checkDir(parent *node, dirName string) (*node, *v2error.Error) {
  2. node, ok := parent.Children[dirName]
  3. if ok {
  4. if node.IsDir() {
  5. return node, nil
  6. }
  7. return nil, v2error.NewError(v2error.EcodeNotDir, node.Path, s.CurrentIndex)
  8. }
  9. n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent)
  10. parent.Children[dirName] = n
  11. return n, nil
  12. }

代码 2 - 3:checkDir 方法实现

store-check_dir.svg
图 3:checkDir 示意图

Create a Node

Store 内部的节点树细节可以通过对节点树上节点的创建、删除等操作深入研究,接下来我们以节点创建为例展示下节点树操作相关内容。代码 3 - 1 中第 7 ~ 16 行的 defer 方法作用是对创建节点动作结果进行统计。注意第 23 行对事件的 EtcdIndex 进行修改,并在 24 行将这个事件通知到 WatchHub。

  1. func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) {
  2. var err *v2error.Error
  3. s.worldLock.Lock()
  4. defer s.worldLock.Unlock()
  5. defer func() {
  6. if err == nil {
  7. s.Stats.Inc(CreateSuccess)
  8. reportWriteSuccess(Create)
  9. return
  10. }
  11. s.Stats.Inc(CreateFail)
  12. reportWriteFailure(Create)
  13. }()
  14. e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create)
  15. if err != nil {
  16. return nil, err
  17. }
  18. e.EtcdIndex = s.CurrentIndex
  19. s.WatcherHub.notify(e)
  20. return e, nil
  21. }

代码 3 - 1:Create 方法实现

通过代码 3 - 1 可以看到,节点创建关键方法在 internalCreate 中实现。代码 3 - 2 中第 4 行、第 81 行与事件的各种 Index 值相关;参数 unique 为 true 时,会根据 nextIndex 生成唯一路径;第 13 行用于确保只读目录不被修改;第 26 行用于创建目录(不含文件名)节点或找到已存在的目录节点;第 35 行用于创建新事件,细节实现见代码 3 - 3;第 38 行坚持要创建的节点是否存在,如果存在,根据 replace 值进行处理;第 56 ~ 67 用于创建最终节点;第 75 ~ 79 用于对非永久节点的处理。Create 核心过程如图 4 所示。

  1. func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
  2. expireTime time.Time, action string) (*Event, *v2error.Error) {
  3. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  4. if unique { // append unique item under the node path
  5. nodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10))
  6. }
  7. nodePath = path.Clean(path.Join("/", nodePath))
  8. // we do not allow the user to change "/"
  9. if s.readonlySet.Contains(nodePath) {
  10. return nil, v2error.NewError(v2error.EcodeRootROnly, "/", currIndex)
  11. }
  12. // Assume expire times that are way in the past are
  13. // This can occur when the time is serialized to JS
  14. if expireTime.Before(minExpireTime) {
  15. expireTime = Permanent
  16. }
  17. dirName, nodeName := path.Split(nodePath)
  18. // walk through the nodePath, create dirs and get the last directory node
  19. d, err := s.walk(dirName, s.checkDir)
  20. if err != nil {
  21. s.Stats.Inc(SetFail)
  22. reportWriteFailure(action)
  23. err.Index = currIndex
  24. return nil, err
  25. }
  26. e := newEvent(action, nodePath, nextIndex, nextIndex)
  27. eNode := e.Node
  28. n, _ := d.GetChild(nodeName)
  29. // force will try to replace an existing file
  30. if n != nil {
  31. if replace {
  32. if n.IsDir() {
  33. return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, currIndex)
  34. }
  35. e.PrevNode = n.Repr(false, false, s.clock)
  36. if err := n.Remove(false, false, nil); err != nil {
  37. return nil, err
  38. }
  39. } else {
  40. return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, currIndex)
  41. }
  42. }
  43. if !dir { // create file
  44. // copy the value for safety
  45. valueCopy := value
  46. eNode.Value = &valueCopy
  47. n = newKV(s, nodePath, value, nextIndex, d, expireTime)
  48. } else { // create directory
  49. eNode.Dir = true
  50. n = newDir(s, nodePath, nextIndex, d, expireTime)
  51. }
  52. // we are sure d is a directory and does not have the children with name n.Name
  53. if err := d.Add(n); err != nil {
  54. return nil, err
  55. }
  56. // node with TTL
  57. if !n.IsPermanent() {
  58. s.ttlKeyHeap.push(n)
  59. eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
  60. }
  61. s.CurrentIndex = nextIndex
  62. return e, nil
  63. }

代码 3 - 2:internalCreate 方法实现

  1. func newEvent(action string, key string, modifiedIndex, createdIndex uint64) *Event {
  2. n := &NodeExtern{
  3. Key: key,
  4. ModifiedIndex: modifiedIndex,
  5. CreatedIndex: createdIndex,
  6. }
  7. return &Event{
  8. Action: action,
  9. Node: n,
  10. }
  11. }

代码 3 - 3:newEvent 方法实现

store-create.svg
图 4:Create 示意图

Repr

在创建节点时,如果最终节点已存在时,需要通过 Repr 将旧节点信息保存在 Event 对象上,这个过程如图 5 所示,实现方法见代码 3 - 4。

  1. func (n *node) Repr(recursive, sorted bool, clock clockwork.Clock) *NodeExtern {
  2. if n.IsDir() {
  3. node := &NodeExtern{
  4. Key: n.Path,
  5. Dir: true,
  6. ModifiedIndex: n.ModifiedIndex,
  7. CreatedIndex: n.CreatedIndex,
  8. }
  9. node.Expiration, node.TTL = n.expirationAndTTL(clock)
  10. if !recursive {
  11. return node
  12. }
  13. children, _ := n.List()
  14. node.Nodes = make(NodeExterns, len(children))
  15. // we do not use the index in the children slice directly
  16. // we need to skip the hidden one
  17. i := 0
  18. for _, child := range children {
  19. if child.IsHidden() { // get will not list hidden node
  20. continue
  21. }
  22. node.Nodes[i] = child.Repr(recursive, sorted, clock)
  23. i++
  24. }
  25. // eliminate hidden nodes
  26. node.Nodes = node.Nodes[:i]
  27. if sorted {
  28. sort.Sort(node.Nodes)
  29. }
  30. return node
  31. }
  32. // since n.Value could be changed later, so we need to copy the value out
  33. value := n.Value
  34. node := &NodeExtern{
  35. Key: n.Path,
  36. Value: &value,
  37. ModifiedIndex: n.ModifiedIndex,
  38. CreatedIndex: n.CreatedIndex,
  39. }
  40. node.Expiration, node.TTL = n.expirationAndTTL(clock)
  41. return node
  42. }

代码 3 - 4:Repr 方法实现

store-repr.svg
图 5:Repr 示意图

Remove

Remove 用于移除旧节点,如果旧节点包含 Children 节点,对每个 Children 节点递归执行 Remove 操作,详细过程见代码 3 - 5。

  1. func (n *node) Remove(dir, recursive bool, callback func(path string)) *v2error.Error {
  2. if !n.IsDir() { // key-value pair
  3. _, name := path.Split(n.Path)
  4. // find its parent and remove the node from the map
  5. if n.Parent != nil && n.Parent.Children[name] == n {
  6. delete(n.Parent.Children, name)
  7. }
  8. if callback != nil {
  9. callback(n.Path)
  10. }
  11. if !n.IsPermanent() {
  12. n.store.ttlKeyHeap.remove(n)
  13. }
  14. return nil
  15. }
  16. if !dir {
  17. // cannot delete a directory without dir set to true
  18. return v2error.NewError(v2error.EcodeNotFile, n.Path, n.store.CurrentIndex)
  19. }
  20. if len(n.Children) != 0 && !recursive {
  21. // cannot delete a directory if it is not empty and the operation
  22. // is not recursive
  23. return v2error.NewError(v2error.EcodeDirNotEmpty, n.Path, n.store.CurrentIndex)
  24. }
  25. for _, child := range n.Children { // delete all children
  26. child.Remove(true, true, callback)
  27. }
  28. // delete self
  29. _, name := path.Split(n.Path)
  30. if n.Parent != nil && n.Parent.Children[name] == n {
  31. delete(n.Parent.Children, name)
  32. if callback != nil {
  33. callback(n.Path)
  34. }
  35. if !n.IsPermanent() {
  36. n.store.ttlKeyHeap.remove(n)
  37. }
  38. }
  39. return nil
  40. }

代码 3 - 5:Remove 方法实现

Watch Hub

Watch

Store 的 watch 方法主要传入参数为 key 和 index,执行成功后返回一个 watcher 实例,该 watcher 同时链接在 watcherHub 实例的 watchers 字典中。执行过程中,会查找 eventQueue 中是否已存在与传入 key 值相关的事件,如果存在,则返回第一个满足条件的事件,并将该事件发送到返回的 watcher 的事件 Channel 中。

store-watch.svg
图 6:watch 示意图

Notify

Notify 功能根据传入事件的路径,找到全部的 watcher 实例,并将事件逐个发送至 watcher 的事件 Channel,同时,移除一次性 watcher 实例,参照代码 4 - 1、4 - 2。

  1. func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
  2. wh.mutex.Lock()
  3. defer wh.mutex.Unlock()
  4. l, ok := wh.watchers[nodePath]
  5. if ok {
  6. curr := l.Front()
  7. for curr != nil {
  8. next := curr.Next() // save reference to the next one in the list
  9. w, _ := curr.Value.(*watcher)
  10. originalPath := e.Node.Key == nodePath
  11. if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
  12. if !w.stream { // do not remove the stream watcher
  13. // if we successfully notify a watcher
  14. // we need to remove the watcher from the list
  15. // and decrease the counter
  16. w.removed = true
  17. l.Remove(curr)
  18. atomic.AddInt64(&wh.count, -1)
  19. reportWatcherRemoved()
  20. }
  21. }
  22. curr = next // update current to the next element in the list
  23. }
  24. if l.Len() == 0 {
  25. // if we have notified all watcher in the list
  26. // we can delete the list
  27. delete(wh.watchers, nodePath)
  28. }
  29. }
  30. }

代码 4 - 1:notify 方法实现

需要注意:watcher 的 eventChan 缓冲空间为 100,代码 22 ~ 28 含义为,如果缓冲满,默认当前 watcher 无效,将其从队列中移除,已发送到该 watcher 的事件仍然可被外部程序捕获到,因此在监听事件时,尽量先将事件读出,防止造成不必要的麻烦。

  1. func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {
  2. // watcher is interested the path in three cases and under one condition
  3. // the condition is that the event happens after the watcher's sinceIndex
  4. // 1. the path at which the event happens is the path the watcher is watching at.
  5. // For example if the watcher is watching at "/foo" and the event happens at "/foo",
  6. // the watcher must be interested in that event.
  7. // 2. the watcher is a recursive watcher, it interests in the event happens after
  8. // its watching path. For example if watcher A watches at "/foo" and it is a recursive
  9. // one, it will interest in the event happens at "/foo/bar".
  10. // 3. when we delete a directory, we need to force notify all the watchers who watches
  11. // at the file we need to delete.
  12. // For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
  13. // should get notified even if "/foo" is not the path it is watching.
  14. if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
  15. // We cannot block here if the eventChan capacity is full, otherwise
  16. // etcd will hang. eventChan capacity is full when the rate of
  17. // notifications are higher than our send rate.
  18. // If this happens, we close the channel.
  19. select {
  20. case w.eventChan <- e:
  21. default:
  22. // We have missed a notification. Remove the watcher.
  23. // Removing the watcher also closes the eventChan.
  24. w.remove()
  25. }
  26. return true
  27. }
  28. return false
  29. }

代码 4 - 2:发送通知事件