Interfaces
图 1:Store 逻辑图
Store 的设计非常简单,主要有两个接口,两个接口通过 Event 关联起来。Store 接口定义如代码 1 - 1 所示,Event 如代码 1 - 2 所示。Store 中关于数据的操作通过类似于文件路径的形式来唯一定位,常规操作 Get、Set、Update 等容易理解,CompareAndSwap、CompareAndDelete 类似于原子操作中的对应方法。Watch 猜测为观察数据变化情况。
type Store interface {
Version() int
Index() uint64
Get(nodePath string, recursive, sorted bool) (*Event, error)
Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error)
Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error)
Create(nodePath string, dir bool, value string, unique bool,
expireOpts TTLOptionSet) (*Event, error)
CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireOpts TTLOptionSet) (*Event, error)
Delete(nodePath string, dir, recursive bool) (*Event, error)
CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)
Save() ([]byte, error)
Recovery(state []byte) error
Clone() Store
SaveNoCopy() ([]byte, error)
JsonStats() []byte
DeleteExpiredKeys(cutoff time.Time)
HasTTLKeys() bool
}
代码 1 - 1:Store 定义
Event 中 Action 猜测为 Get、Update 等,NodeExtern 为一个树状结构,暂时猜测与数值存储唯一路径关联。
type Event struct {
Action string `json:"action"`
Node *NodeExtern `json:"node,omitempty"`
PrevNode *NodeExtern `json:"prevNode,omitempty"`
EtcdIndex uint64 `json:"-"`
Refresh bool `json:"refresh,omitempty"`
}
代码 1 - 2:Event 定义
NodeExtern 保存与 Key 值相关的树状结构,Nodes 为下一级子节点,8 ~ 9 行的 Index 现在可理解为文件的修改时间和创建时间。NodeExtern 排序接口实现代码见代码 1 - 4。
type NodeExtern struct {
Key string `json:"key,omitempty"`
Value *string `json:"value,omitempty"`
Dir bool `json:"dir,omitempty"`
Expiration *time.Time `json:"expiration,omitempty"`
TTL int64 `json:"ttl,omitempty"`
Nodes NodeExterns `json:"nodes,omitempty"`
ModifiedIndex uint64 `json:"modifiedIndex,omitempty"`
CreatedIndex uint64 `json:"createdIndex,omitempty"`
}
代码 1 - 3:NodeExtern 定义
func (ns NodeExterns) Len() int {
return len(ns)
}
func (ns NodeExterns) Less(i, j int) bool {
return ns[i].Key < ns[j].Key
}
func (ns NodeExterns) Swap(i, j int) {
ns[i], ns[j] = ns[j], ns[i]
}
代码 1 - 4:NodeExtern 排序接口实现
Store
Store 接口的实现为 store 结构,其核心组件如图 2 所示。Root 指向的是一个树状结构,这个树状结构类似于文件系统,其中的 Children 对象中的 string 值为文件子目录(自文件)名,假如一个完整的几点路径为:/first/second/third/node,那么将从 Root 开始查找 first 对应的 node,再从其 Children 中查找键值为 second 的 node,直至最终找到一个确定的节点。
WatcherHub 中设置对于特定路径的观察对象列表,它包含了一个事件队列以及一个观察者链表,当要持续观察一个特定路径时,会在对应的列表上创建一个新 watcher 对象,接下来所以对该路径修改的操作产生的事件对象都会发送至 watcher 的 eventChan 中。
另一个核心组件为 ttlKeyHeap,维持了一个堆结构,用于控制 node 的生存周期,根据 node 的超时时间进行排序。
图 2:store 核心组件
核心结构之一 node 定义在代码 2 - 1 中,每个 node 包含一个路径、创建索引计数和修改索引计数,并有一个指向父节点的指针和根据子目录关键字确定的子目录树,超时时间在 ttlKeyHeap 中使用。
type node struct {
Path string
CreatedIndex uint64
ModifiedIndex uint64
Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
ExpireTime time.Time
Value string // for key-value pair
Children map[string]*node // for directory
// A reference to the store this node is attached to.
store *store
}
代码 2 - 1:node 定义
Walk
对 node 节点的各种操作(增删改查等)都需要首先遍历节点树,以找到合适的节点,遍历操作在代码 2 - 2 中。第 2 行对路径进行分割操作,第 7 行开始进行节点树遍历操作,遍历时会使用自定义遍历方法 walkFunc 对当前节点进行处理,并返回下一个要处理的节点。
func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *v2error.Error)) (*node, *v2error.Error) {
components := strings.Split(nodePath, "/")
curr := s.Root
var err *v2error.Error
for i := 1; i < len(components); i++ {
if len(components[i]) == 0 { // ignore empty string
return curr, nil
}
curr, err = walkFunc(curr, components[i])
if err != nil {
return nil, err
}
}
return curr, nil
}
代码 2 - 2:walk 方法实现
以 checkDir 为例来说明 walkFunc 使用方法,在代码 2 - 3 中第 2 行,首先根据一级目录来查找子节点,如果当前节点存在且是目录节点,那么返回当前节点即可,注意,返回的当前节点会用于下次调用的参数;如果不存在,那么就创建新节点,注意第 12 行中第二个参数,使用父节点的 Path 与要查找的目录路径 dirName 进行拼接操作,这就说明 node 中的 Path 保存的是完整路径名,Children 中的键值只保留当前目录名,如果遍历到当前级别为 /first/second,要查找目录为 third,那么新节点 n 的 Path 中保存的将是全路径 first/second/third,parent 遍历的 Children 中会添加一个键值 third 保存当前新节点 n,如图 3 所示。
func (s *store) checkDir(parent *node, dirName string) (*node, *v2error.Error) {
node, ok := parent.Children[dirName]
if ok {
if node.IsDir() {
return node, nil
}
return nil, v2error.NewError(v2error.EcodeNotDir, node.Path, s.CurrentIndex)
}
n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent)
parent.Children[dirName] = n
return n, nil
}
代码 2 - 3:checkDir 方法实现
图 3:checkDir 示意图
Create a Node
Store 内部的节点树细节可以通过对节点树上节点的创建、删除等操作深入研究,接下来我们以节点创建为例展示下节点树操作相关内容。代码 3 - 1 中第 7 ~ 16 行的 defer 方法作用是对创建节点动作结果进行统计。注意第 23 行对事件的 EtcdIndex 进行修改,并在 24 行将这个事件通知到 WatchHub。
func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) {
var err *v2error.Error
s.worldLock.Lock()
defer s.worldLock.Unlock()
defer func() {
if err == nil {
s.Stats.Inc(CreateSuccess)
reportWriteSuccess(Create)
return
}
s.Stats.Inc(CreateFail)
reportWriteFailure(Create)
}()
e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create)
if err != nil {
return nil, err
}
e.EtcdIndex = s.CurrentIndex
s.WatcherHub.notify(e)
return e, nil
}
代码 3 - 1:Create 方法实现
通过代码 3 - 1 可以看到,节点创建关键方法在 internalCreate 中实现。代码 3 - 2 中第 4 行、第 81 行与事件的各种 Index 值相关;参数 unique 为 true 时,会根据 nextIndex 生成唯一路径;第 13 行用于确保只读目录不被修改;第 26 行用于创建目录(不含文件名)节点或找到已存在的目录节点;第 35 行用于创建新事件,细节实现见代码 3 - 3;第 38 行坚持要创建的节点是否存在,如果存在,根据 replace 值进行处理;第 56 ~ 67 用于创建最终节点;第 75 ~ 79 用于对非永久节点的处理。Create 核心过程如图 4 所示。
func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
expireTime time.Time, action string) (*Event, *v2error.Error) {
currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
if unique { // append unique item under the node path
nodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10))
}
nodePath = path.Clean(path.Join("/", nodePath))
// we do not allow the user to change "/"
if s.readonlySet.Contains(nodePath) {
return nil, v2error.NewError(v2error.EcodeRootROnly, "/", currIndex)
}
// Assume expire times that are way in the past are
// This can occur when the time is serialized to JS
if expireTime.Before(minExpireTime) {
expireTime = Permanent
}
dirName, nodeName := path.Split(nodePath)
// walk through the nodePath, create dirs and get the last directory node
d, err := s.walk(dirName, s.checkDir)
if err != nil {
s.Stats.Inc(SetFail)
reportWriteFailure(action)
err.Index = currIndex
return nil, err
}
e := newEvent(action, nodePath, nextIndex, nextIndex)
eNode := e.Node
n, _ := d.GetChild(nodeName)
// force will try to replace an existing file
if n != nil {
if replace {
if n.IsDir() {
return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, currIndex)
}
e.PrevNode = n.Repr(false, false, s.clock)
if err := n.Remove(false, false, nil); err != nil {
return nil, err
}
} else {
return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, currIndex)
}
}
if !dir { // create file
// copy the value for safety
valueCopy := value
eNode.Value = &valueCopy
n = newKV(s, nodePath, value, nextIndex, d, expireTime)
} else { // create directory
eNode.Dir = true
n = newDir(s, nodePath, nextIndex, d, expireTime)
}
// we are sure d is a directory and does not have the children with name n.Name
if err := d.Add(n); err != nil {
return nil, err
}
// node with TTL
if !n.IsPermanent() {
s.ttlKeyHeap.push(n)
eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
}
s.CurrentIndex = nextIndex
return e, nil
}
代码 3 - 2:internalCreate 方法实现
func newEvent(action string, key string, modifiedIndex, createdIndex uint64) *Event {
n := &NodeExtern{
Key: key,
ModifiedIndex: modifiedIndex,
CreatedIndex: createdIndex,
}
return &Event{
Action: action,
Node: n,
}
}
代码 3 - 3:newEvent 方法实现
图 4:Create 示意图
Repr
在创建节点时,如果最终节点已存在时,需要通过 Repr 将旧节点信息保存在 Event 对象上,这个过程如图 5 所示,实现方法见代码 3 - 4。
func (n *node) Repr(recursive, sorted bool, clock clockwork.Clock) *NodeExtern {
if n.IsDir() {
node := &NodeExtern{
Key: n.Path,
Dir: true,
ModifiedIndex: n.ModifiedIndex,
CreatedIndex: n.CreatedIndex,
}
node.Expiration, node.TTL = n.expirationAndTTL(clock)
if !recursive {
return node
}
children, _ := n.List()
node.Nodes = make(NodeExterns, len(children))
// we do not use the index in the children slice directly
// we need to skip the hidden one
i := 0
for _, child := range children {
if child.IsHidden() { // get will not list hidden node
continue
}
node.Nodes[i] = child.Repr(recursive, sorted, clock)
i++
}
// eliminate hidden nodes
node.Nodes = node.Nodes[:i]
if sorted {
sort.Sort(node.Nodes)
}
return node
}
// since n.Value could be changed later, so we need to copy the value out
value := n.Value
node := &NodeExtern{
Key: n.Path,
Value: &value,
ModifiedIndex: n.ModifiedIndex,
CreatedIndex: n.CreatedIndex,
}
node.Expiration, node.TTL = n.expirationAndTTL(clock)
return node
}
代码 3 - 4:Repr 方法实现
图 5:Repr 示意图
Remove
Remove 用于移除旧节点,如果旧节点包含 Children 节点,对每个 Children 节点递归执行 Remove 操作,详细过程见代码 3 - 5。
func (n *node) Remove(dir, recursive bool, callback func(path string)) *v2error.Error {
if !n.IsDir() { // key-value pair
_, name := path.Split(n.Path)
// find its parent and remove the node from the map
if n.Parent != nil && n.Parent.Children[name] == n {
delete(n.Parent.Children, name)
}
if callback != nil {
callback(n.Path)
}
if !n.IsPermanent() {
n.store.ttlKeyHeap.remove(n)
}
return nil
}
if !dir {
// cannot delete a directory without dir set to true
return v2error.NewError(v2error.EcodeNotFile, n.Path, n.store.CurrentIndex)
}
if len(n.Children) != 0 && !recursive {
// cannot delete a directory if it is not empty and the operation
// is not recursive
return v2error.NewError(v2error.EcodeDirNotEmpty, n.Path, n.store.CurrentIndex)
}
for _, child := range n.Children { // delete all children
child.Remove(true, true, callback)
}
// delete self
_, name := path.Split(n.Path)
if n.Parent != nil && n.Parent.Children[name] == n {
delete(n.Parent.Children, name)
if callback != nil {
callback(n.Path)
}
if !n.IsPermanent() {
n.store.ttlKeyHeap.remove(n)
}
}
return nil
}
代码 3 - 5:Remove 方法实现
Watch Hub
Watch
Store 的 watch 方法主要传入参数为 key 和 index,执行成功后返回一个 watcher 实例,该 watcher 同时链接在 watcherHub 实例的 watchers 字典中。执行过程中,会查找 eventQueue 中是否已存在与传入 key 值相关的事件,如果存在,则返回第一个满足条件的事件,并将该事件发送到返回的 watcher 的事件 Channel 中。
图 6:watch 示意图
Notify
Notify 功能根据传入事件的路径,找到全部的 watcher 实例,并将事件逐个发送至 watcher 的事件 Channel,同时,移除一次性 watcher 实例,参照代码 4 - 1、4 - 2。
func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
wh.mutex.Lock()
defer wh.mutex.Unlock()
l, ok := wh.watchers[nodePath]
if ok {
curr := l.Front()
for curr != nil {
next := curr.Next() // save reference to the next one in the list
w, _ := curr.Value.(*watcher)
originalPath := e.Node.Key == nodePath
if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
if !w.stream { // do not remove the stream watcher
// if we successfully notify a watcher
// we need to remove the watcher from the list
// and decrease the counter
w.removed = true
l.Remove(curr)
atomic.AddInt64(&wh.count, -1)
reportWatcherRemoved()
}
}
curr = next // update current to the next element in the list
}
if l.Len() == 0 {
// if we have notified all watcher in the list
// we can delete the list
delete(wh.watchers, nodePath)
}
}
}
代码 4 - 1:notify 方法实现
需要注意:watcher 的 eventChan 缓冲空间为 100,代码 22 ~ 28 含义为,如果缓冲满,默认当前 watcher 无效,将其从队列中移除,已发送到该 watcher 的事件仍然可被外部程序捕获到,因此在监听事件时,尽量先将事件读出,防止造成不必要的麻烦。
func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {
// watcher is interested the path in three cases and under one condition
// the condition is that the event happens after the watcher's sinceIndex
// 1. the path at which the event happens is the path the watcher is watching at.
// For example if the watcher is watching at "/foo" and the event happens at "/foo",
// the watcher must be interested in that event.
// 2. the watcher is a recursive watcher, it interests in the event happens after
// its watching path. For example if watcher A watches at "/foo" and it is a recursive
// one, it will interest in the event happens at "/foo/bar".
// 3. when we delete a directory, we need to force notify all the watchers who watches
// at the file we need to delete.
// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
// should get notified even if "/foo" is not the path it is watching.
if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
// We cannot block here if the eventChan capacity is full, otherwise
// etcd will hang. eventChan capacity is full when the rate of
// notifications are higher than our send rate.
// If this happens, we close the channel.
select {
case w.eventChan <- e:
default:
// We have missed a notification. Remove the watcher.
// Removing the watcher also closes the eventChan.
w.remove()
}
return true
}
return false
}
代码 4 - 2:发送通知事件