Interfaces
图 1:Store 逻辑图
Store 的设计非常简单,主要有两个接口,两个接口通过 Event 关联起来。Store 接口定义如代码 1 - 1 所示,Event 如代码 1 - 2 所示。Store 中关于数据的操作通过类似于文件路径的形式来唯一定位,常规操作 Get、Set、Update 等容易理解,CompareAndSwap、CompareAndDelete 类似于原子操作中的对应方法。Watch 猜测为观察数据变化情况。
type Store interface {Version() intIndex() uint64Get(nodePath string, recursive, sorted bool) (*Event, error)Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error)Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error)Create(nodePath string, dir bool, value string, unique bool,expireOpts TTLOptionSet) (*Event, error)CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,value string, expireOpts TTLOptionSet) (*Event, error)Delete(nodePath string, dir, recursive bool) (*Event, error)CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)Save() ([]byte, error)Recovery(state []byte) errorClone() StoreSaveNoCopy() ([]byte, error)JsonStats() []byteDeleteExpiredKeys(cutoff time.Time)HasTTLKeys() bool}
代码 1 - 1:Store 定义
Event 中 Action 猜测为 Get、Update 等,NodeExtern 为一个树状结构,暂时猜测与数值存储唯一路径关联。
type Event struct {Action string `json:"action"`Node *NodeExtern `json:"node,omitempty"`PrevNode *NodeExtern `json:"prevNode,omitempty"`EtcdIndex uint64 `json:"-"`Refresh bool `json:"refresh,omitempty"`}
代码 1 - 2:Event 定义
NodeExtern 保存与 Key 值相关的树状结构,Nodes 为下一级子节点,8 ~ 9 行的 Index 现在可理解为文件的修改时间和创建时间。NodeExtern 排序接口实现代码见代码 1 - 4。
type NodeExtern struct {Key string `json:"key,omitempty"`Value *string `json:"value,omitempty"`Dir bool `json:"dir,omitempty"`Expiration *time.Time `json:"expiration,omitempty"`TTL int64 `json:"ttl,omitempty"`Nodes NodeExterns `json:"nodes,omitempty"`ModifiedIndex uint64 `json:"modifiedIndex,omitempty"`CreatedIndex uint64 `json:"createdIndex,omitempty"`}
代码 1 - 3:NodeExtern 定义
func (ns NodeExterns) Len() int {return len(ns)}func (ns NodeExterns) Less(i, j int) bool {return ns[i].Key < ns[j].Key}func (ns NodeExterns) Swap(i, j int) {ns[i], ns[j] = ns[j], ns[i]}
代码 1 - 4:NodeExtern 排序接口实现
Store
Store 接口的实现为 store 结构,其核心组件如图 2 所示。Root 指向的是一个树状结构,这个树状结构类似于文件系统,其中的 Children 对象中的 string 值为文件子目录(自文件)名,假如一个完整的几点路径为:/first/second/third/node,那么将从 Root 开始查找 first 对应的 node,再从其 Children 中查找键值为 second 的 node,直至最终找到一个确定的节点。
WatcherHub 中设置对于特定路径的观察对象列表,它包含了一个事件队列以及一个观察者链表,当要持续观察一个特定路径时,会在对应的列表上创建一个新 watcher 对象,接下来所以对该路径修改的操作产生的事件对象都会发送至 watcher 的 eventChan 中。
另一个核心组件为 ttlKeyHeap,维持了一个堆结构,用于控制 node 的生存周期,根据 node 的超时时间进行排序。
图 2:store 核心组件
核心结构之一 node 定义在代码 2 - 1 中,每个 node 包含一个路径、创建索引计数和修改索引计数,并有一个指向父节点的指针和根据子目录关键字确定的子目录树,超时时间在 ttlKeyHeap 中使用。
type node struct {Path stringCreatedIndex uint64ModifiedIndex uint64Parent *node `json:"-"` // should not encode this field! avoid circular dependency.ExpireTime time.TimeValue string // for key-value pairChildren map[string]*node // for directory// A reference to the store this node is attached to.store *store}
代码 2 - 1:node 定义
Walk
对 node 节点的各种操作(增删改查等)都需要首先遍历节点树,以找到合适的节点,遍历操作在代码 2 - 2 中。第 2 行对路径进行分割操作,第 7 行开始进行节点树遍历操作,遍历时会使用自定义遍历方法 walkFunc 对当前节点进行处理,并返回下一个要处理的节点。
func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *v2error.Error)) (*node, *v2error.Error) {components := strings.Split(nodePath, "/")curr := s.Rootvar err *v2error.Errorfor i := 1; i < len(components); i++ {if len(components[i]) == 0 { // ignore empty stringreturn curr, nil}curr, err = walkFunc(curr, components[i])if err != nil {return nil, err}}return curr, nil}
代码 2 - 2:walk 方法实现
以 checkDir 为例来说明 walkFunc 使用方法,在代码 2 - 3 中第 2 行,首先根据一级目录来查找子节点,如果当前节点存在且是目录节点,那么返回当前节点即可,注意,返回的当前节点会用于下次调用的参数;如果不存在,那么就创建新节点,注意第 12 行中第二个参数,使用父节点的 Path 与要查找的目录路径 dirName 进行拼接操作,这就说明 node 中的 Path 保存的是完整路径名,Children 中的键值只保留当前目录名,如果遍历到当前级别为 /first/second,要查找目录为 third,那么新节点 n 的 Path 中保存的将是全路径 first/second/third,parent 遍历的 Children 中会添加一个键值 third 保存当前新节点 n,如图 3 所示。
func (s *store) checkDir(parent *node, dirName string) (*node, *v2error.Error) {node, ok := parent.Children[dirName]if ok {if node.IsDir() {return node, nil}return nil, v2error.NewError(v2error.EcodeNotDir, node.Path, s.CurrentIndex)}n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent)parent.Children[dirName] = nreturn n, nil}
代码 2 - 3:checkDir 方法实现
图 3:checkDir 示意图
Create a Node
Store 内部的节点树细节可以通过对节点树上节点的创建、删除等操作深入研究,接下来我们以节点创建为例展示下节点树操作相关内容。代码 3 - 1 中第 7 ~ 16 行的 defer 方法作用是对创建节点动作结果进行统计。注意第 23 行对事件的 EtcdIndex 进行修改,并在 24 行将这个事件通知到 WatchHub。
func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) {var err *v2error.Errors.worldLock.Lock()defer s.worldLock.Unlock()defer func() {if err == nil {s.Stats.Inc(CreateSuccess)reportWriteSuccess(Create)return}s.Stats.Inc(CreateFail)reportWriteFailure(Create)}()e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create)if err != nil {return nil, err}e.EtcdIndex = s.CurrentIndexs.WatcherHub.notify(e)return e, nil}
代码 3 - 1:Create 方法实现
通过代码 3 - 1 可以看到,节点创建关键方法在 internalCreate 中实现。代码 3 - 2 中第 4 行、第 81 行与事件的各种 Index 值相关;参数 unique 为 true 时,会根据 nextIndex 生成唯一路径;第 13 行用于确保只读目录不被修改;第 26 行用于创建目录(不含文件名)节点或找到已存在的目录节点;第 35 行用于创建新事件,细节实现见代码 3 - 3;第 38 行坚持要创建的节点是否存在,如果存在,根据 replace 值进行处理;第 56 ~ 67 用于创建最终节点;第 75 ~ 79 用于对非永久节点的处理。Create 核心过程如图 4 所示。
func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,expireTime time.Time, action string) (*Event, *v2error.Error) {currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1if unique { // append unique item under the node pathnodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10))}nodePath = path.Clean(path.Join("/", nodePath))// we do not allow the user to change "/"if s.readonlySet.Contains(nodePath) {return nil, v2error.NewError(v2error.EcodeRootROnly, "/", currIndex)}// Assume expire times that are way in the past are// This can occur when the time is serialized to JSif expireTime.Before(minExpireTime) {expireTime = Permanent}dirName, nodeName := path.Split(nodePath)// walk through the nodePath, create dirs and get the last directory noded, err := s.walk(dirName, s.checkDir)if err != nil {s.Stats.Inc(SetFail)reportWriteFailure(action)err.Index = currIndexreturn nil, err}e := newEvent(action, nodePath, nextIndex, nextIndex)eNode := e.Noden, _ := d.GetChild(nodeName)// force will try to replace an existing fileif n != nil {if replace {if n.IsDir() {return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, currIndex)}e.PrevNode = n.Repr(false, false, s.clock)if err := n.Remove(false, false, nil); err != nil {return nil, err}} else {return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, currIndex)}}if !dir { // create file// copy the value for safetyvalueCopy := valueeNode.Value = &valueCopyn = newKV(s, nodePath, value, nextIndex, d, expireTime)} else { // create directoryeNode.Dir = truen = newDir(s, nodePath, nextIndex, d, expireTime)}// we are sure d is a directory and does not have the children with name n.Nameif err := d.Add(n); err != nil {return nil, err}// node with TTLif !n.IsPermanent() {s.ttlKeyHeap.push(n)eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)}s.CurrentIndex = nextIndexreturn e, nil}
代码 3 - 2:internalCreate 方法实现
func newEvent(action string, key string, modifiedIndex, createdIndex uint64) *Event {n := &NodeExtern{Key: key,ModifiedIndex: modifiedIndex,CreatedIndex: createdIndex,}return &Event{Action: action,Node: n,}}
代码 3 - 3:newEvent 方法实现
图 4:Create 示意图
Repr
在创建节点时,如果最终节点已存在时,需要通过 Repr 将旧节点信息保存在 Event 对象上,这个过程如图 5 所示,实现方法见代码 3 - 4。
func (n *node) Repr(recursive, sorted bool, clock clockwork.Clock) *NodeExtern {if n.IsDir() {node := &NodeExtern{Key: n.Path,Dir: true,ModifiedIndex: n.ModifiedIndex,CreatedIndex: n.CreatedIndex,}node.Expiration, node.TTL = n.expirationAndTTL(clock)if !recursive {return node}children, _ := n.List()node.Nodes = make(NodeExterns, len(children))// we do not use the index in the children slice directly// we need to skip the hidden onei := 0for _, child := range children {if child.IsHidden() { // get will not list hidden nodecontinue}node.Nodes[i] = child.Repr(recursive, sorted, clock)i++}// eliminate hidden nodesnode.Nodes = node.Nodes[:i]if sorted {sort.Sort(node.Nodes)}return node}// since n.Value could be changed later, so we need to copy the value outvalue := n.Valuenode := &NodeExtern{Key: n.Path,Value: &value,ModifiedIndex: n.ModifiedIndex,CreatedIndex: n.CreatedIndex,}node.Expiration, node.TTL = n.expirationAndTTL(clock)return node}
代码 3 - 4:Repr 方法实现
图 5:Repr 示意图
Remove
Remove 用于移除旧节点,如果旧节点包含 Children 节点,对每个 Children 节点递归执行 Remove 操作,详细过程见代码 3 - 5。
func (n *node) Remove(dir, recursive bool, callback func(path string)) *v2error.Error {if !n.IsDir() { // key-value pair_, name := path.Split(n.Path)// find its parent and remove the node from the mapif n.Parent != nil && n.Parent.Children[name] == n {delete(n.Parent.Children, name)}if callback != nil {callback(n.Path)}if !n.IsPermanent() {n.store.ttlKeyHeap.remove(n)}return nil}if !dir {// cannot delete a directory without dir set to truereturn v2error.NewError(v2error.EcodeNotFile, n.Path, n.store.CurrentIndex)}if len(n.Children) != 0 && !recursive {// cannot delete a directory if it is not empty and the operation// is not recursivereturn v2error.NewError(v2error.EcodeDirNotEmpty, n.Path, n.store.CurrentIndex)}for _, child := range n.Children { // delete all childrenchild.Remove(true, true, callback)}// delete self_, name := path.Split(n.Path)if n.Parent != nil && n.Parent.Children[name] == n {delete(n.Parent.Children, name)if callback != nil {callback(n.Path)}if !n.IsPermanent() {n.store.ttlKeyHeap.remove(n)}}return nil}
代码 3 - 5:Remove 方法实现
Watch Hub
Watch
Store 的 watch 方法主要传入参数为 key 和 index,执行成功后返回一个 watcher 实例,该 watcher 同时链接在 watcherHub 实例的 watchers 字典中。执行过程中,会查找 eventQueue 中是否已存在与传入 key 值相关的事件,如果存在,则返回第一个满足条件的事件,并将该事件发送到返回的 watcher 的事件 Channel 中。
图 6:watch 示意图
Notify
Notify 功能根据传入事件的路径,找到全部的 watcher 实例,并将事件逐个发送至 watcher 的事件 Channel,同时,移除一次性 watcher 实例,参照代码 4 - 1、4 - 2。
func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {wh.mutex.Lock()defer wh.mutex.Unlock()l, ok := wh.watchers[nodePath]if ok {curr := l.Front()for curr != nil {next := curr.Next() // save reference to the next one in the listw, _ := curr.Value.(*watcher)originalPath := e.Node.Key == nodePathif (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {if !w.stream { // do not remove the stream watcher// if we successfully notify a watcher// we need to remove the watcher from the list// and decrease the counterw.removed = truel.Remove(curr)atomic.AddInt64(&wh.count, -1)reportWatcherRemoved()}}curr = next // update current to the next element in the list}if l.Len() == 0 {// if we have notified all watcher in the list// we can delete the listdelete(wh.watchers, nodePath)}}}
代码 4 - 1:notify 方法实现
需要注意:watcher 的 eventChan 缓冲空间为 100,代码 22 ~ 28 含义为,如果缓冲满,默认当前 watcher 无效,将其从队列中移除,已发送到该 watcher 的事件仍然可被外部程序捕获到,因此在监听事件时,尽量先将事件读出,防止造成不必要的麻烦。
func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {// watcher is interested the path in three cases and under one condition// the condition is that the event happens after the watcher's sinceIndex// 1. the path at which the event happens is the path the watcher is watching at.// For example if the watcher is watching at "/foo" and the event happens at "/foo",// the watcher must be interested in that event.// 2. the watcher is a recursive watcher, it interests in the event happens after// its watching path. For example if watcher A watches at "/foo" and it is a recursive// one, it will interest in the event happens at "/foo/bar".// 3. when we delete a directory, we need to force notify all the watchers who watches// at the file we need to delete.// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher// should get notified even if "/foo" is not the path it is watching.if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {// We cannot block here if the eventChan capacity is full, otherwise// etcd will hang. eventChan capacity is full when the rate of// notifications are higher than our send rate.// If this happens, we close the channel.select {case w.eventChan <- e:default:// We have missed a notification. Remove the watcher.// Removing the watcher also closes the eventChan.w.remove()}return true}return false}
代码 4 - 2:发送通知事件
